Hi Chandni,

There is no change in plan, I will be working on basic WAL infrastructure.
I have not discussed about how to implement WindowDataManager using WAL,
I know that you will be working on it.

This mail is for declaring interface for Write Ahead Log and File System
based
implementations for Write Ahead Log. The Classes I will be adding are
- WALReader - interface
- WALWriter - interface
- FSWALReader - Implementation of WALReader which writes to FS.
- FSWALWriter - Implementation of WALWriter which writes to FS.
- RollingFSWALReader - Implementation
- RollingFSWALWriter - Implementation
- WALManager - Interface.
- AbstractWALManager - Which is abstracted out functionality of WALManager
in HDHT.

RecoverableOperator is not for idempotency, It is an abstracted out pattern
for the
operator like HDHT. It can added later if it is a common pattern.

- Tushar.

On Thu, Jan 21, 2016 at 6:07 PM, Chandni Singh <[email protected]>
wrote:

> Hi Tushar,
> I thought the plan was to change WindowDataManager to use WAL once WAL is
> added to Malhar. I don't know when did this plan change.
>
> There is already a ticket created for it and I was going to work on it once
> WAL is moved to Malhar.
>
> https://datatorrent.atlassian.net/browse/SPOI-7116
>
> Also a related discussion happened here:
>
>
> http://mail-archives.apache.org/mod_mbox/apex-dev/201511.mbox/%3ccakabfbma3rtg-uv7h8ao_cfz3nr4u8qpkbjmueggmrtge2h...@mail.gmail.com%3E
>
> I don't think we should add another utility which does what
> WindowDataManager does because
>
> 1. Duplicate utilities which try to achieve the same thing.
>
> 2. A bigger impact on operators that already work with WindowDataManager.
> This will make them backward incompatible as well.
>
> 3. IMO creating an abstract RecoverableOperator is not a flexible design.
> Idempotency entails higher cost. When it is a pluggable component of the
> operator, user has a choice to turn it off by setting NOOP data manager
> when they don't care about idempotency.
>
> 4. IMO it is always better to enhance/change existing components instead of
> adding new ones which are used for the same purpose.
>
> Thanks,
>
> Chandni
> On Jan 21, 2016 3:06 PM, "Tushar Gosavi" <[email protected]> wrote:
>
> > Hi All,
> > We am planing to add Utility classes in Malhar for providing Write Ahead
> > Log capability to the operators.
> >
> > Motivation:
> > Reconstructing state of operator - Some operators keep huge state in
> > memory, which causes check-pointing overhead and slows down
> > the processing if checkpoints are happening frequently. Such operator can
> > benefit by keeping in memory state as transient and allow
> > reconstructing of state from the stored tuples. The operator will write
> > tuples as they are arriving in the WAL and will process them.
> > During recovery of operator the tuples from WAL is read back to
> reconstruct
> > the in-memory state. (The processing of tuples needs to be idempotent).
> >
> > The operator which maintain their state on file system will also benefit
> > from this, as they do not have to update persisted state frequently. They
> > could update persistent state after enough data is available in memory
> and
> > apply that data to persistent state. On failure in-memory state
> > will be reconstructed from the WAL.
> >
> > You can think of this functionality similar to the buffer server only
> > persisted on the HDFS, and operator can explicitly manage purging.
> >
> > WAL can also be used to provide implementation for WindowDataManager,
> which
> > keeps information about beginWindow, endWindow markers and information
> > about tuples between them and this information will be used to replay the
> > tuples in same order. Using WAL will result in fewer files as compare to
> > FSWindowDataManager.
> >
> > General Design
> > We will introduce two Interfaces
> >
> > WALWriter - this will have following methods
> >  - append : Append the data at the end of the WAL
> >  - getOffet : Return offset which will need to be tracked for recovery.
> >  - flush    : make sure that data is persisted on the external storage.
> >
> > WALReader - This will provide iterator like interface for providing
> access
> > to the WAL.
> >  - seek  : seek at a particular offset.
> >  - advance : read the entry, returns valid if entry is available
> >  - get : get the current entry read by advanced.
> >  - getOffset return current offset in the WAL.
> >
> > The following implementation will be provided which works with DFS
> > FileSystems. These
> > classes will take a serializer for converting data to byte array before
> > writing and
> > converting object from byte array while reading.
> >
> > - FSWALReader implements WALReader
> > - FSWALWriter implements WALWriter
> >
> >
> > RollingFSWalReader, RollingFSWALWriter this implementation will support
> > rolling files based on
> > size of the log. These will internally use FSWALReader and FSWALWriter
> for
> > writing log segments.
> >
> > WALManager - This interface will provide following method
> >  - setup() setup WAL implementation and serializer to use.
> >  - runRecovery(Recoverable obj) where recoverable is also an interface
> > having just one method recovere(tuple) to recover the tuple which is read
> > from the WAL.
> >  - setStart(WALPointer start) set the marker, during recovery WAL will
> > start reading tuples from this offset. operator will call this method to
> > specify that the data before start pointer is not needed.
> >
> >
> > For example the Abstract implementaion of RecoverableOperator can be
> > ```java
> > public abstract class RecoverableOperator<T> extends BaseOperator
> >   implements WAL.Recoverable<T>, Operator.CheckpointListener
> > {
> >   private WalManager<T> wm;
> >
> >   public transient DefaultInputPort<T> input = new DefaultInputPort<T>()
> >   {
> >     @Override
> >     public void process(T t)
> >     {
> >       processTuple(t, false);
> >     }
> >   };
> >
> >   public void setup(Context.OperatorContext context)
> >   {
> >     wm.setup(context, new Serializer<T>());
> >     /* build any in-memory state */
> >     wm.runRecovery(this);
> >   }
> >
> >   void processTuple(T tuple, boolean recovery) throws IOException
> >   {
> >     // if this is called as part of normal processing, write it to the
> WAL.
> >     // in case of recovery, don't write tuple again to the WAL.
> >     if (!recovery)
> >       wm.write(tuple);
> >     processTuple(tuple);
> >   }
> >
> >   public void recoveryTuple(T tuple)
> >   {
> >     processTuple(tuple, true);
> >   }
> >
> >   public void committed(int id) {
> >      // update the pointer if before pointer is not needed.
> >      wm.setStart(pointer);
> >   }
> >
> >   protected abstract void processTuple(T tuple);
> > }
> > ```
> >
> > Let me know about your thought.
> >
> >  -Tushar.
> >
>

Reply via email to