Commit af12bc6a authored by Dustin L. Howett's avatar Dustin L. Howett

Overhaul how expirations are stored.

Storage is now done through a StorageAdapter interface, which specifies
how to save and load expirations. There are two adapters provided: one
that saves nowhere and one that saves in a gob file (to mimic the
original behaviour of this package.)

Code using the old expiration store will automatically migrate to the
new store format on load/save.
parent 6e523e0a
......@@ -4,32 +4,80 @@
package gotimeout
import (
"bytes"
"encoding/gob"
"os"
"sync"
"time"
)
// ExpirableID provides Opaque identification for an Expirable object.
type ExpirableID string
type expirationHandle struct {
ExpirationTime time.Time
ID ExpirableID
// Handle is an opaque token marking an object for destruction.
type Handle struct {
expirationTime time.Time
id ExpirableID
expirationTimer *time.Timer
}
func (h *Handle) MarshalBinary() ([]byte, error) {
b := &bytes.Buffer{}
enc := gob.NewEncoder(b)
enc.Encode(string(h.id))
enc.Encode(h.expirationTime)
return b.Bytes(), nil
}
func (h *Handle) UnmarshalBinary(b []byte) error {
r := bytes.NewReader(b)
dec := gob.NewDecoder(r)
var s string
dec.Decode(&s)
h.id = ExpirableID(s)
dec.Decode(&h.expirationTime)
return nil
}
// HandleMap is an opaque storage structure for expiring objects.
type HandleMap struct {
m map[ExpirableID]*Handle
}
func newHandleMap() *HandleMap {
return &HandleMap{
m: make(map[ExpirableID]*Handle),
}
}
func (h *HandleMap) MarshalBinary() ([]byte, error) {
b := &bytes.Buffer{}
enc := gob.NewEncoder(b)
enc.Encode(h.m)
return b.Bytes(), nil
}
func (h *HandleMap) UnmarshalBinary(b []byte) error {
r := bytes.NewReader(b)
dec := gob.NewDecoder(r)
dec.Decode(&h.m)
return nil
}
// Expirator provides the primary mechanism of gotimeout: an expiration daemon.
type Expirator struct {
// A channel upon which a client can receive error messages from the daemon.
ErrorChannel <-chan error
adapter StorageAdapter
store ExpirableStore
dataPath string
expirationMap map[ExpirableID]*expirationHandle
expirationChannel chan *expirationHandle
expirationMap *HandleMap
expirationChannel chan *Handle
flushRequired bool
urgentFlushRequired bool
errorChannelSend chan<- error
wg sync.WaitGroup
}
// Expirable represents any object that might be expired.
......@@ -46,10 +94,23 @@ type ExpirableStore interface {
// path may be "", denoting that this Expirator should not save anything. The returned expiration daemon will have
// already been started.
func NewExpirator(path string, store ExpirableStore) *Expirator {
var sa StorageAdapter
if path == "" {
sa = NoopAdapter{}
} else {
sa = &legacyUpgradingGobFileAdapter{NewGobFileAdapter(path)}
}
return NewExpiratorWithStorage(sa, store)
}
// NewExpiratorWithStorage returns a new Expirator given a store and means with which to store expiration metadata.
// The returned expiration daemon will have already been started.
func NewExpiratorWithStorage(sa StorageAdapter, store ExpirableStore) *Expirator {
e := &Expirator{
adapter: sa,
store: store,
dataPath: path,
expirationChannel: make(chan *expirationHandle, 1000),
expirationChannel: make(chan *Handle, 1000),
expirationMap: newHandleMap(),
}
errorChannel := make(chan error, 1)
......@@ -60,31 +121,17 @@ func NewExpirator(path string, store ExpirableStore) *Expirator {
return e
}
func (e *Expirator) canSave() bool {
return e.dataPath != ""
}
func (e *Expirator) loadExpirations() error {
if !e.canSave() {
return nil
}
file, err := os.Open(e.dataPath)
hm, err := e.adapter.Load()
if err != nil {
// Being unable to open the file is not to be considered an error condition:
// the file not existing means that we simply haven't written anything yet.
return nil
return err
}
defer file.Close()
gobDecoder := gob.NewDecoder(file)
tempMap := make(map[ExpirableID]*expirationHandle)
err = gobDecoder.Decode(&tempMap)
if err != nil {
return err
if hm == nil {
return nil
}
for _, v := range tempMap {
for _, v := range hm.m {
e.registerExpirationHandle(v)
}
......@@ -92,22 +139,10 @@ func (e *Expirator) loadExpirations() error {
}
func (e *Expirator) saveExpirations() error {
if !e.canSave() {
return nil
}
if e.expirationMap == nil {
return nil
}
file, err := os.Create(e.dataPath)
if err != nil {
return err
}
defer file.Close()
gobEncoder := gob.NewEncoder(file)
err = gobEncoder.Encode(e.expirationMap)
err := e.adapter.Save(e.expirationMap)
if err != nil {
return err
}
......@@ -117,45 +152,44 @@ func (e *Expirator) saveExpirations() error {
return nil
}
func (e *Expirator) registerExpirationHandle(ex *expirationHandle) {
func (e *Expirator) registerExpirationHandle(ex *Handle) {
expiryFunc := func() { e.expirationChannel <- ex }
if e.expirationMap == nil {
e.expirationMap = make(map[ExpirableID]*expirationHandle)
}
if ex.expirationTimer != nil {
e.cancelExpirationHandle(ex)
}
now := time.Now()
if ex.ExpirationTime.After(now) {
e.expirationMap[ex.ID] = ex
if ex.expirationTime.After(now) {
e.expirationMap.m[ex.id] = ex
e.urgentFlushRequired = true
ex.expirationTimer = time.AfterFunc(ex.ExpirationTime.Sub(now), expiryFunc)
ex.expirationTimer = time.AfterFunc(ex.expirationTime.Sub(now), expiryFunc)
} else {
expiryFunc()
}
}
func (e *Expirator) cancelExpirationHandle(ex *expirationHandle) {
func (e *Expirator) cancelExpirationHandle(ex *Handle) {
ex.expirationTimer.Stop()
delete(e.expirationMap, ex.ID)
delete(e.expirationMap.m, ex.id)
e.urgentFlushRequired = true
}
func (e *Expirator) run() {
e.wg.Add(1)
go func() {
if err := e.loadExpirations(); err != nil {
e.errorChannelSend <- err
}
e.wg.Done()
}()
var flushTickerChan, urgentFlushTickerChan <-chan time.Time
if e.canSave() {
if e.adapter.RequiresFlush() {
flushTickerChan, urgentFlushTickerChan = time.NewTicker(30*time.Second).C, time.NewTicker(1*time.Second).C
}
e.wg.Wait()
for {
select {
// 30-second flush timer (only save if changed)
......@@ -173,9 +207,9 @@ func (e *Expirator) run() {
}
}
case expiration := <-e.expirationChannel:
delete(e.expirationMap, expiration.ID)
delete(e.expirationMap.m, expiration.id)
if expirable := e.store.GetExpirable(expiration.ID); expirable != nil {
if expirable := e.store.GetExpirable(expiration.id); expirable != nil {
e.store.DestroyExpirable(expirable)
}
......@@ -187,18 +221,18 @@ func (e *Expirator) run() {
// ExpireObject registers an object for expiration after a given duration.
func (e *Expirator) ExpireObject(ex Expirable, dur time.Duration) {
id := ex.ExpirationID()
exh, ok := e.expirationMap[id]
exh, ok := e.expirationMap.m[id]
if !ok {
exh = &expirationHandle{ID: id}
exh = &Handle{id: id}
}
exh.ExpirationTime = time.Now().Add(dur)
exh.expirationTime = time.Now().Add(dur)
e.registerExpirationHandle(exh)
}
// CancelObjectExpiration stays the execution of an object, or does nothing if the given object was not going to be executed.
func (e *Expirator) CancelObjectExpiration(ex Expirable) {
id := ex.ExpirationID()
exh, ok := e.expirationMap[id]
exh, ok := e.expirationMap.m[id]
if ok {
e.cancelExpirationHandle(exh)
}
......@@ -207,7 +241,7 @@ func (e *Expirator) CancelObjectExpiration(ex Expirable) {
// ObjectHasExpiration returns whether the given object has a registered expiration.
func (e *Expirator) ObjectHasExpiration(ex Expirable) bool {
id := ex.ExpirationID()
_, ok := e.expirationMap[id]
_, ok := e.expirationMap.m[id]
return ok
}
......@@ -215,7 +249,7 @@ func (e *Expirator) ObjectHasExpiration(ex Expirable) bool {
func (e *Expirator) Len() (l int) {
l = 0
if e.expirationMap != nil {
l = len(e.expirationMap)
l = len(e.expirationMap.m)
}
return
}
package gotimeout
import (
"encoding/gob"
"os"
"time"
)
type legacyUpgradingGobFileAdapter struct {
*GobFileAdapter
}
func (a *legacyUpgradingGobFileAdapter) Load() (*HandleMap, error) {
hm, err := a.GobFileAdapter.Load()
if err == nil {
return hm, err
}
return a.attemptUpgrade()
}
func (a *legacyUpgradingGobFileAdapter) attemptUpgrade() (*HandleMap, error) {
file, err := os.Open(a.GobFileAdapter.filename)
if err != nil {
return nil, err
}
defer file.Close()
var oldMap map[ExpirableID]struct {
ExpirationTime time.Time
ID ExpirableID
}
dec := gob.NewDecoder(file)
err = dec.Decode(&oldMap)
if err != nil {
return nil, err
}
hm := newHandleMap()
for k, v := range oldMap {
hm.m[k] = &Handle{v.ExpirationTime, v.ID, nil}
}
return hm, nil
}
package gotimeout
import (
"encoding/gob"
"os"
)
// StorageAdapter is the interface through which expiration storage is abstracted.
//
// Save stores a set of expiration handles, and Load returns them unharmed.
//
// Implementation that care about saving their changes in a timely manner should
// return true from RequiresFlush.
type StorageAdapter interface {
RequiresFlush() bool
Save(*HandleMap) error
Load() (*HandleMap, error)
}
// GobFileAdapter is a StorageAdapter that saves expiration handles in a file via the encoding/gob package.
type GobFileAdapter struct {
filename string
}
func (a *GobFileAdapter) RequiresFlush() bool {
return true
}
func (a *GobFileAdapter) Save(hm *HandleMap) error {
file, err := os.Create(a.filename)
if err != nil {
return err
}
defer file.Close()
enc := gob.NewEncoder(file)
err = enc.Encode(hm)
if err != nil {
return err
}
return nil
}
func (a *GobFileAdapter) Load() (*HandleMap, error) {
file, err := os.Open(a.filename)
if err != nil {
return nil, err
}
defer file.Close()
dec := gob.NewDecoder(file)
var hm *HandleMap
err = dec.Decode(&hm)
if err != nil {
return nil, err
}
return hm, nil
}
func NewGobFileAdapter(filename string) *GobFileAdapter {
return &GobFileAdapter{filename: filename}
}
// NoopAdapter is a StorageAdapter that does not save expiration handles.
type NoopAdapter struct{}
func (NoopAdapter) RequiresFlush() bool {
return false
}
func (NoopAdapter) Save(*HandleMap) error {
return nil
}
func (NoopAdapter) Load() (*HandleMap, error) {
return nil, nil
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment