I have created a pull request with the changes https://github.com/apache/incubator-apex-malhar/pull/204
-Tushar. On Sat, Feb 27, 2016 at 12:24 AM, Thomas Weise <[email protected]> wrote: > Tushar, any update on this? > > > On Sun, Jan 24, 2016 at 9:19 PM, Thomas Weise <[email protected]> > wrote: > > > Package should be then be org.apache.apex.malhar.lib.wal > > > > I would suggest to later put the WindowDataManager there as well. > Chandni? > > > > > > On Thu, Jan 21, 2016 at 11:04 PM, Tushar Gosavi <[email protected]> > > wrote: > > > >> I will put these utilities under library, as they do not depend on > >> external > >> packages. What should be package name, com.datatorrent.library.wal? > >> or should we switch to apache apex namespace like > org.apache.apex.lib.wal > >> ? > >> > >> - Tushar. > >> > >> > >> On Thu, Jan 21, 2016 at 10:21 PM, Thomas Weise <[email protected]> > >> wrote: > >> > >> > Makes sense. Let's keep the basic WAL infrastructure and patterns on > >> top of > >> > it separate. > >> > > >> > > >> > On Thu, Jan 21, 2016 at 8:48 AM, Tushar Gosavi < > [email protected]> > >> > wrote: > >> > > >> > > Hi Chandni, > >> > > > >> > > There is no change in plan, I will be working on basic WAL > >> > infrastructure. > >> > > I have not discussed about how to implement WindowDataManager using > >> WAL, > >> > > I know that you will be working on it. > >> > > > >> > > This mail is for declaring interface for Write Ahead Log and File > >> System > >> > > based > >> > > implementations for Write Ahead Log. The Classes I will be adding > are > >> > > - WALReader - interface > >> > > - WALWriter - interface > >> > > - FSWALReader - Implementation of WALReader which writes to FS. > >> > > - FSWALWriter - Implementation of WALWriter which writes to FS. > >> > > - RollingFSWALReader - Implementation > >> > > - RollingFSWALWriter - Implementation > >> > > - WALManager - Interface. > >> > > - AbstractWALManager - Which is abstracted out functionality of > >> > WALManager > >> > > in HDHT. > >> > > > >> > > RecoverableOperator is not for idempotency, It is an abstracted out > >> > pattern > >> > > for the > >> > > operator like HDHT. It can added later if it is a common pattern. > >> > > > >> > > - Tushar. > >> > > > >> > > On Thu, Jan 21, 2016 at 6:07 PM, Chandni Singh < > >> [email protected]> > >> > > wrote: > >> > > > >> > > > Hi Tushar, > >> > > > I thought the plan was to change WindowDataManager to use WAL once > >> WAL > >> > is > >> > > > added to Malhar. I don't know when did this plan change. > >> > > > > >> > > > There is already a ticket created for it and I was going to work > on > >> it > >> > > once > >> > > > WAL is moved to Malhar. > >> > > > > >> > > > https://datatorrent.atlassian.net/browse/SPOI-7116 > >> > > > > >> > > > Also a related discussion happened here: > >> > > > > >> > > > > >> > > > > >> > > > >> > > >> > http://mail-archives.apache.org/mod_mbox/apex-dev/201511.mbox/%3ccakabfbma3rtg-uv7h8ao_cfz3nr4u8qpkbjmueggmrtge2h...@mail.gmail.com%3E > >> > > > > >> > > > I don't think we should add another utility which does what > >> > > > WindowDataManager does because > >> > > > > >> > > > 1. Duplicate utilities which try to achieve the same thing. > >> > > > > >> > > > 2. A bigger impact on operators that already work with > >> > WindowDataManager. > >> > > > This will make them backward incompatible as well. > >> > > > > >> > > > 3. IMO creating an abstract RecoverableOperator is not a flexible > >> > design. > >> > > > Idempotency entails higher cost. When it is a pluggable component > of > >> > the > >> > > > operator, user has a choice to turn it off by setting NOOP data > >> manager > >> > > > when they don't care about idempotency. > >> > > > > >> > > > 4. IMO it is always better to enhance/change existing components > >> > instead > >> > > of > >> > > > adding new ones which are used for the same purpose. > >> > > > > >> > > > Thanks, > >> > > > > >> > > > Chandni > >> > > > On Jan 21, 2016 3:06 PM, "Tushar Gosavi" <[email protected]> > >> > wrote: > >> > > > > >> > > > > Hi All, > >> > > > > We am planing to add Utility classes in Malhar for providing > Write > >> > > Ahead > >> > > > > Log capability to the operators. > >> > > > > > >> > > > > Motivation: > >> > > > > Reconstructing state of operator - Some operators keep huge > state > >> in > >> > > > > memory, which causes check-pointing overhead and slows down > >> > > > > the processing if checkpoints are happening frequently. Such > >> operator > >> > > can > >> > > > > benefit by keeping in memory state as transient and allow > >> > > > > reconstructing of state from the stored tuples. The operator > will > >> > write > >> > > > > tuples as they are arriving in the WAL and will process them. > >> > > > > During recovery of operator the tuples from WAL is read back to > >> > > > reconstruct > >> > > > > the in-memory state. (The processing of tuples needs to be > >> > idempotent). > >> > > > > > >> > > > > The operator which maintain their state on file system will also > >> > > benefit > >> > > > > from this, as they do not have to update persisted state > >> frequently. > >> > > They > >> > > > > could update persistent state after enough data is available in > >> > memory > >> > > > and > >> > > > > apply that data to persistent state. On failure in-memory state > >> > > > > will be reconstructed from the WAL. > >> > > > > > >> > > > > You can think of this functionality similar to the buffer server > >> only > >> > > > > persisted on the HDFS, and operator can explicitly manage > purging. > >> > > > > > >> > > > > WAL can also be used to provide implementation for > >> WindowDataManager, > >> > > > which > >> > > > > keeps information about beginWindow, endWindow markers and > >> > information > >> > > > > about tuples between them and this information will be used to > >> replay > >> > > the > >> > > > > tuples in same order. Using WAL will result in fewer files as > >> compare > >> > > to > >> > > > > FSWindowDataManager. > >> > > > > > >> > > > > General Design > >> > > > > We will introduce two Interfaces > >> > > > > > >> > > > > WALWriter - this will have following methods > >> > > > > - append : Append the data at the end of the WAL > >> > > > > - getOffet : Return offset which will need to be tracked for > >> > recovery. > >> > > > > - flush : make sure that data is persisted on the external > >> > storage. > >> > > > > > >> > > > > WALReader - This will provide iterator like interface for > >> providing > >> > > > access > >> > > > > to the WAL. > >> > > > > - seek : seek at a particular offset. > >> > > > > - advance : read the entry, returns valid if entry is available > >> > > > > - get : get the current entry read by advanced. > >> > > > > - getOffset return current offset in the WAL. > >> > > > > > >> > > > > The following implementation will be provided which works with > DFS > >> > > > > FileSystems. These > >> > > > > classes will take a serializer for converting data to byte array > >> > before > >> > > > > writing and > >> > > > > converting object from byte array while reading. > >> > > > > > >> > > > > - FSWALReader implements WALReader > >> > > > > - FSWALWriter implements WALWriter > >> > > > > > >> > > > > > >> > > > > RollingFSWalReader, RollingFSWALWriter this implementation will > >> > support > >> > > > > rolling files based on > >> > > > > size of the log. These will internally use FSWALReader and > >> > FSWALWriter > >> > > > for > >> > > > > writing log segments. > >> > > > > > >> > > > > WALManager - This interface will provide following method > >> > > > > - setup() setup WAL implementation and serializer to use. > >> > > > > - runRecovery(Recoverable obj) where recoverable is also an > >> > interface > >> > > > > having just one method recovere(tuple) to recover the tuple > which > >> is > >> > > read > >> > > > > from the WAL. > >> > > > > - setStart(WALPointer start) set the marker, during recovery > WAL > >> > will > >> > > > > start reading tuples from this offset. operator will call this > >> method > >> > > to > >> > > > > specify that the data before start pointer is not needed. > >> > > > > > >> > > > > > >> > > > > For example the Abstract implementaion of RecoverableOperator > can > >> be > >> > > > > ```java > >> > > > > public abstract class RecoverableOperator<T> extends > BaseOperator > >> > > > > implements WAL.Recoverable<T>, Operator.CheckpointListener > >> > > > > { > >> > > > > private WalManager<T> wm; > >> > > > > > >> > > > > public transient DefaultInputPort<T> input = new > >> > > DefaultInputPort<T>() > >> > > > > { > >> > > > > @Override > >> > > > > public void process(T t) > >> > > > > { > >> > > > > processTuple(t, false); > >> > > > > } > >> > > > > }; > >> > > > > > >> > > > > public void setup(Context.OperatorContext context) > >> > > > > { > >> > > > > wm.setup(context, new Serializer<T>()); > >> > > > > /* build any in-memory state */ > >> > > > > wm.runRecovery(this); > >> > > > > } > >> > > > > > >> > > > > void processTuple(T tuple, boolean recovery) throws > IOException > >> > > > > { > >> > > > > // if this is called as part of normal processing, write it > to > >> > the > >> > > > WAL. > >> > > > > // in case of recovery, don't write tuple again to the WAL. > >> > > > > if (!recovery) > >> > > > > wm.write(tuple); > >> > > > > processTuple(tuple); > >> > > > > } > >> > > > > > >> > > > > public void recoveryTuple(T tuple) > >> > > > > { > >> > > > > processTuple(tuple, true); > >> > > > > } > >> > > > > > >> > > > > public void committed(int id) { > >> > > > > // update the pointer if before pointer is not needed. > >> > > > > wm.setStart(pointer); > >> > > > > } > >> > > > > > >> > > > > protected abstract void processTuple(T tuple); > >> > > > > } > >> > > > > ``` > >> > > > > > >> > > > > Let me know about your thought. > >> > > > > > >> > > > > -Tushar. > >> > > > > > >> > > > > >> > > > >> > > >> > > > > >
