Commit 08ddb196 authored by Dustin L. Howett's avatar Dustin L. Howett

Provide a channel for the explicit purpose of error communication.

parent 7e5ab491
......@@ -20,12 +20,16 @@ type expirationHandle struct {
// Expirator provides the primary mechanism of gotimeout: an expiration daemon.
type Expirator struct {
// A channel upon which a client can receive error messages from the daemon.
ErrorChannel <-chan error
store ExpirableStore
dataPath string
expirationMap map[ExpirableID]*expirationHandle
expirationChannel chan *expirationHandle
flushRequired bool
urgentFlushRequired bool
errorChannelSend chan<- error
}
// Expirable represents any object that might be expired.
......@@ -47,6 +51,11 @@ func NewExpirator(path string, store ExpirableStore) *Expirator {
dataPath: path,
expirationChannel: make(chan *expirationHandle, 1000),
}
errorChannel := make(chan error, 1)
e.ErrorChannel = (<-chan error)(errorChannel)
e.errorChannelSend = (chan<- error)(errorChannel)
go e.run()
return e
}
......@@ -55,46 +64,57 @@ func (e *Expirator) canSave() bool {
return e.dataPath != ""
}
func (e *Expirator) loadExpirations() {
func (e *Expirator) loadExpirations() error {
if !e.canSave() {
return
return nil
}
file, err := os.Open(e.dataPath)
if err != nil {
return
// Being unable to open the file is not to be considered an error condition:
// the file not existing means that we simply haven't written anything yet.
return nil
}
defer file.Close()
gobDecoder := gob.NewDecoder(file)
tempMap := make(map[ExpirableID]*expirationHandle)
gobDecoder.Decode(&tempMap)
file.Close()
err = gobDecoder.Decode(&tempMap)
if err != nil {
return err
}
for _, v := range tempMap {
e.registerExpirationHandle(v)
}
return nil
}
func (e *Expirator) saveExpirations() {
func (e *Expirator) saveExpirations() error {
if !e.canSave() {
return
return nil
}
if e.expirationMap == nil {
return
return nil
}
file, err := os.Create(e.dataPath)
if err != nil {
return
return err
}
defer file.Close()
gobEncoder := gob.NewEncoder(file)
gobEncoder.Encode(e.expirationMap)
file.Close()
err = gobEncoder.Encode(e.expirationMap)
if err != nil {
return err
}
e.flushRequired, e.urgentFlushRequired = false, false
return nil
}
func (e *Expirator) registerExpirationHandle(ex *expirationHandle) {
......@@ -127,7 +147,11 @@ func (e *Expirator) cancelExpirationHandle(ex *expirationHandle) {
}
func (e *Expirator) run() {
go e.loadExpirations()
go func() {
if err := e.loadExpirations(); err != nil {
e.errorChannelSend <- err
}
}()
var flushTickerChan, urgentFlushTickerChan <-chan time.Time
if e.canSave() {
flushTickerChan, urgentFlushTickerChan = time.NewTicker(30*time.Second).C, time.NewTicker(1*time.Second).C
......@@ -137,12 +161,16 @@ func (e *Expirator) run() {
// 30-second flush timer (only save if changed)
case _ = <-flushTickerChan:
if e.expirationMap != nil && (e.flushRequired || e.urgentFlushRequired) {
e.saveExpirations()
if err := e.saveExpirations(); err != nil {
e.errorChannelSend <- err
}
}
// 1-second flush timer (only save if *super-urgent, but still throttle)
case _ = <-urgentFlushTickerChan:
if e.expirationMap != nil && e.urgentFlushRequired {
e.saveExpirations()
if err := e.saveExpirations(); err != nil {
e.errorChannelSend <- err
}
}
case expiration := <-e.expirationChannel:
delete(e.expirationMap, expiration.ID)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment