Commit bffcfba1 authored by Dustin L. Howett's avatar Dustin L. Howett

Fix some concurrency issues in Expirator.

parent eb721633
......@@ -78,6 +78,7 @@ type Expirator struct {
urgentFlushRequired bool
errorChannelSend chan<- error
wg sync.WaitGroup
mu sync.RWMutex
}
// Expirable represents any object that might be expired.
......@@ -139,6 +140,9 @@ func (e *Expirator) loadExpirations() error {
}
func (e *Expirator) saveExpirations() error {
e.mu.Lock()
defer e.mu.Unlock()
if e.expirationMap == nil {
return nil
}
......@@ -153,6 +157,9 @@ func (e *Expirator) saveExpirations() error {
}
func (e *Expirator) registerExpirationHandle(ex *Handle) {
e.mu.Lock()
defer e.mu.Unlock()
expiryFunc := func() { e.expirationChannel <- ex }
if ex.expirationTimer != nil {
......@@ -207,30 +214,37 @@ func (e *Expirator) run() {
}
}
case expiration := <-e.expirationChannel:
e.mu.Lock()
delete(e.expirationMap.m, expiration.id)
e.flushRequired = true
e.mu.Unlock()
if expirable := e.store.GetExpirable(expiration.id); expirable != nil {
e.store.DestroyExpirable(expirable)
}
e.flushRequired = true
}
}
}
// ExpireObject registers an object for expiration after a given duration.
func (e *Expirator) ExpireObject(ex Expirable, dur time.Duration) {
e.mu.RLock()
id := ex.ExpirationID()
exh, ok := e.expirationMap.m[id]
if !ok {
exh = &Handle{id: id}
}
exh.expirationTime = time.Now().Add(dur)
e.mu.RUnlock()
e.registerExpirationHandle(exh)
}
// CancelObjectExpiration stays the execution of an object, or does nothing if the given object was not going to be executed.
func (e *Expirator) CancelObjectExpiration(ex Expirable) {
e.mu.Lock()
defer e.mu.Unlock()
id := ex.ExpirationID()
exh, ok := e.expirationMap.m[id]
if ok {
......@@ -240,6 +254,8 @@ func (e *Expirator) CancelObjectExpiration(ex Expirable) {
// ObjectHasExpiration returns whether the given object has a registered expiration.
func (e *Expirator) ObjectHasExpiration(ex Expirable) bool {
e.mu.RLock()
defer e.mu.RUnlock()
id := ex.ExpirationID()
_, ok := e.expirationMap.m[id]
return ok
......@@ -247,6 +263,8 @@ func (e *Expirator) ObjectHasExpiration(ex Expirable) bool {
// Len returns the number of objects registered for expiration.
func (e *Expirator) Len() (l int) {
e.mu.RLock()
defer e.mu.RUnlock()
l = 0
if e.expirationMap != nil {
l = len(e.expirationMap.m)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment