[ 
https://issues.apache.org/jira/browse/APEXMALHAR-1965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15110487#comment-15110487
 ] 

Tushar Gosavi commented on APEXMALHAR-1965:
-------------------------------------------

We am planing to add Utility classes in Malhar for providing Write Ahead Log 
capability to the operators.

h2. Motivation
Reconstructing state of operator - Some operators keep huge state in memory, 
which causes check-pointing overhead and slows down
the processing if checkpoints are happening frequently. Such operator can 
benefit by keeping in memory state as transient and allow
reconstructing of state from the stored tuples. The operator will write tuples 
as they are arriving in the WAL and will process them.
During recovery of operator the tuples from WAL is read back to reconstruct the 
in-memory state. (The processing of tuples needs to be idempotent).

The operator which maintain their state on file system will also benefit from 
this as then do not have to update persisted state frequently
and reduce the disk I/Os. They could write after enough data is available in 
memory and apply those data to persisted state. On failure in-memory state
will be reconstructed from the WAL.

You can think of this functionality similar to the buffer server only persisted 
on the HDFS, and operator can explicitly manage purging.

WAL can also be used to provide implementation for WindowDataManager, which 
keeps information about beginWindow, endWindow marker and information
about tuples between them and this information will be used to replay the 
tuples in same order. Using WAL will result in fewer files as compare to
FSWindowDataManager.

h2. General Design
We will introduce two Interfaces

h3. {{WALWriter}}
This will have following methods
 - append : Append the data at the end of the WAL
 - getOffet : Return offset which will need to be tracked for recovery.
 - flush    : make sure that data is persisted on the external storage.

h3. {{WALReader}}
This will provide iterator like interface for providing access to the WAL.
 - seek  : seek at a particular offset.
 - advance : read the entry, returns valid if entry is available
 - get : get the current entry read by advanced.
 - getOffset return current offset in the WAL.

The following implementation will be provided which works with DFS FileSystems. 
These classes will take a serializer for converting data to byte array before 
writing and converting object from byte array while reading.

- {{FSWALReader}} implements {{WALReader}}
- {{FSWALWriter}} implements {{WALWriter}}

{{RollingFSWalReader}}, {{RollingFSWALWriter}} this implementation will support 
rolling files based on
size of the log. These will internally use {{FSWALReader}} and {{FSWALWriter}} 
for writing log segments.

h3. {{WALManager}}
This interface will provide following method
 - setup() setup WAL implementation and serializer to use.
 - recovery(Recoverable obj) where recoverable is also an interface having just 
one method recovere() to recover the tuple
   from the WAL.
 - setStart(WALPointer start) set the marker, during recovery WAL will start 
reading tuples from this offset. operator will call
   this method to specify that the data before start pointer is not needed.

For example the Abstract implementaion of RecoverableOperator can be
{code}
public abstract class RecoverableOperator<T> extends BaseOperator
  implements WAL.Recoverable<T>, Operator.CheckpointListener
{
  private WalManager<T> wm;

  public transient DefaultInputPort<T> input = new DefaultInputPort<T>()
  {
    @Override
    public void process(T t)
    {
      processTuple(t, false);
    }
  };

  public void setup(Context.OperatorContext context)
  {
    wm.setup(context, new Serializer<T>());
    /* build any in-memory state */
    wm.runRecovery(this);
  }

  void processTuple(T tuple, boolean recovery) throws IOException
  {
    // if this is called as part of normal processing, write it to the WAL.
    // in case of recovery, don't write tuple again to the WAL.
    if (!recovery)
      wm.write(tuple);
    processTuple(tuple);
  }

  public void recoveryTuple(T tuple)
  {
    processTuple(tuple, true);
  }

  protected abstract void processTuple(T tuple);
}
{code}


> Create a WAL in Malhar
> ----------------------
>
>                 Key: APEXMALHAR-1965
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-1965
>             Project: Apache Apex Malhar
>          Issue Type: Task
>            Reporter: Chandni Singh
>            Assignee: Tushar Gosavi
>
> In Malhar we have an IdempotentStorageManager which we use like a Write Ahead 
> Logger. There have been some other places where we have created a different 
> flavor of Write Ahead Logger. 
> We need to find overlap between all these flavors and create a common Write 
> Ahead Logger for use in Apex core and Apex malhar.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to