Keeping data in sync
Apply log is invoked once a log entry is committed. It returns a value which will be made available in the ApplyFuture returned by Raft.Apply method if that method was called on the same Raft node as the FSM.
You might have noticed back in our NewRaftSetup
function, we had a line
cfg.fsm = &fsm{
dataFile: fmt.Sprintf("%s/data.json", storagePath),
}
The fsm
type is implemented in our store package, and implements hashicorp/raft
's FSM interface. FSM stands for finite state machine. According to the docs, an FSM implementation needs to have three functions:
Apply(*Log) interface{}
Snapshot() (FSMSnapshot, error)
Restore(io.ReadCloser) error
As such, we need to create a new struct that has those three functions to replicate our data. Apply
will change our local state. Snapshot
will take a snapshot of our state. Restore
will overwrite our local state with a snapshot. We will also need to implement the FSMSnapshot interface, which requires two functions:
Persist(sink SnapshotSink) error
Release()
To start, we will create two structs to store our data.
type fsm struct {
dataFile string
lock *flock.Flock
}
type fsmSnapshot struct {
data []byte
}
Our fsm
type will have a file path where data is stored, and a file lock, so that our process will only do one action at a time. We are using flock to implement our file lock. We could do it ourselves with Go's sync.Mutex, but flock is a bit higher-level, and makes things easier for us.
The first thing we need to do is implement the interface functions that the raft library requires. The docs for the interface say:
Apply log is invoked once a log entry is committed. It returns a value which will be made available in the ApplyFuture returned by Raft.Apply method if that method was called on the same Raft node as the FSM.
func (f *fsm) Apply(l *raft.Log) interface{} {
log.Info("fsm.Apply called", "type", hclog.Fmt("%d", l.Type), "data", hclog.Fmt("%s", l.Data))
var cmd Command
if err := json.Unmarshal(l.Data, &cmd); err != nil {
log.Error("failed command unmarshal", "error", err)
return nil
}
ctx := context.Background()
switch cmd.Action {
case "set":
return f.localSet(ctx, cmd.Key, cmd.Value)
case "delete":
return f.localDelete(ctx, cmd.Key)
default:
log.Error("unknown command", "command", cmd, "log", l)
}
return nil
}
In our implementation, we are ignoring the ApplyFuture
functionality. Instead we are just parsing out the action we need to perform on the local data, and then running that action on the local data store, which we will implement below.
Snapshot
needs to return a snapshot of the FSM. Ours is currently incredibly simple, just storing the data that is in our local datastore. To make it more powerful, we could include a history of logs seen or something, but for now, this works. It is pretty simple code - it just gets the data and shoves it in a variable.
func (f *fsm) Snapshot() (raft.FSMSnapshot, error) {
log.Info("fsm.Snapshot called")
data, err := f.loadData(context.Background())
if err != nil {
return nil, err
}
encodedData, err := encode(data)
if err != nil {
return nil, err
}
return &fsmSnapshot{data: encodedData}, nil
}
Restore
is just the inverse of Snapshot
. It pulls the data out of an io
pipe, and writes it to disk.
func (f *fsm) Restore(old io.ReadCloser) error {
log.Info("fs.Restore called")
b, err := ioutil.ReadAll(old)
if err != nil {
return err
}
data, err := decode(b)
if err != nil {
return err
}
return f.saveData(context.Background(), data)
}
Now we need to implement our local functions. These should look very similar to functions from the beginning of our key-value server in Chapter 2. They load data, modify it, and then save it.
func (f *fsm) localSet(ctx context.Context, key, value string) error {
data, err := f.loadData(ctx)
if err != nil {
return err
}
data[key] = value
return f.saveData(ctx, data)
}
func (f *fsm) localGet(ctx context.Context, key string) (string, error) {
data, err := f.loadData(ctx)
if err != nil {
return "", fmt.Errorf("load: %w", err)
}
This page is a preview of Reliable Webservers with Go