Hi All,
We am planing to add Utility classes in Malhar for providing Write Ahead
Log capability to the operators.
Motivation:
Reconstructing state of operator - Some operators keep huge state in
memory, which causes check-pointing overhead and slows down
the processing if checkpoints are happening frequently. Such operator can
benefit by keeping in memory state as transient and allow
reconstructing of state from the stored tuples. The operator will write
tuples as they are arriving in the WAL and will process them.
During recovery of operator the tuples from WAL is read back to reconstruct
the in-memory state. (The processing of tuples needs to be idempotent).
The operator which maintain their state on file system will also benefit
from this, as they do not have to update persisted state frequently. They
could update persistent state after enough data is available in memory and
apply that data to persistent state. On failure in-memory state
will be reconstructed from the WAL.
You can think of this functionality similar to the buffer server only
persisted on the HDFS, and operator can explicitly manage purging.
WAL can also be used to provide implementation for WindowDataManager, which
keeps information about beginWindow, endWindow markers and information
about tuples between them and this information will be used to replay the
tuples in same order. Using WAL will result in fewer files as compare to
FSWindowDataManager.
General Design
We will introduce two Interfaces
WALWriter - this will have following methods
- append : Append the data at the end of the WAL
- getOffet : Return offset which will need to be tracked for recovery.
- flush : make sure that data is persisted on the external storage.
WALReader - This will provide iterator like interface for providing access
to the WAL.
- seek : seek at a particular offset.
- advance : read the entry, returns valid if entry is available
- get : get the current entry read by advanced.
- getOffset return current offset in the WAL.
The following implementation will be provided which works with DFS
FileSystems. These
classes will take a serializer for converting data to byte array before
writing and
converting object from byte array while reading.
- FSWALReader implements WALReader
- FSWALWriter implements WALWriter
RollingFSWalReader, RollingFSWALWriter this implementation will support
rolling files based on
size of the log. These will internally use FSWALReader and FSWALWriter for
writing log segments.
WALManager - This interface will provide following method
- setup() setup WAL implementation and serializer to use.
- runRecovery(Recoverable obj) where recoverable is also an interface
having just one method recovere(tuple) to recover the tuple which is read
from the WAL.
- setStart(WALPointer start) set the marker, during recovery WAL will
start reading tuples from this offset. operator will call this method to
specify that the data before start pointer is not needed.
For example the Abstract implementaion of RecoverableOperator can be
```java
public abstract class RecoverableOperator<T> extends BaseOperator
implements WAL.Recoverable<T>, Operator.CheckpointListener
{
private WalManager<T> wm;
public transient DefaultInputPort<T> input = new DefaultInputPort<T>()
{
@Override
public void process(T t)
{
processTuple(t, false);
}
};
public void setup(Context.OperatorContext context)
{
wm.setup(context, new Serializer<T>());
/* build any in-memory state */
wm.runRecovery(this);
}
void processTuple(T tuple, boolean recovery) throws IOException
{
// if this is called as part of normal processing, write it to the WAL.
// in case of recovery, don't write tuple again to the WAL.
if (!recovery)
wm.write(tuple);
processTuple(tuple);
}
public void recoveryTuple(T tuple)
{
processTuple(tuple, true);
}
public void committed(int id) {
// update the pointer if before pointer is not needed.
wm.setStart(pointer);
}
protected abstract void processTuple(T tuple);
}
```
Let me know about your thought.
-Tushar.