Hi Ashish, Nice, looking forward to see the support for Geode! Have you created any JIRAs for your work yet?
Just in case you have not seen it yet, here are the contributor guidelines: http://apex.incubator.apache.org/contributing.html Thanks, Thomas On Fri, Dec 4, 2015 at 9:49 AM, Ashish Tadose <[email protected]> wrote: > Gaurav, Sandesh > > PFB my comments in *bold* > > 1. Are there standard APIs for distributed In-Memory stores or is this > implementation specific to one particular tool? > *I have developed concrete implementation with Apache Geode - > http://geode.incubator.apache.org/ <http://geode.incubator.apache.org/>* > *However for this feature contribution I am adding KeyValue store interface > and abstract implementation to plug in any KeyValue store as storage > agent. * > > 2. Will In-Memory Store compete with DataTorrent Apps for cluster resources > (memory/cpu)? > *Probable not, In-memory store would be separate managed cluster which may > not part of yarn env. * > > 3. What is the purging policy? Who is responsible for cleaning up the > resources for completed/failed/aborted applications? This becomes important > when you want to launch an Application using previous Application Id > *In-memory storage would support delete checkpoint which platform calls > periodically d**uring application lifetime. * > *Purging the checkpoints of older applications will be taken care by > application developer or admin who is managing the in-memory cluster, same > is the case with HDFS storage agents where user have to manually delete old > apps and checkpoints data.* > > 4 What all in-memory store did you evaluate? Are they YARN compatible? > *I have concrete implementation of Geode storage agent which I would be > contributing along with this feature.* > > Thanks, > Ashish > > > On Fri, Dec 4, 2015 at 12:45 AM, Sandesh Hegde <[email protected]> > wrote: > > > Ashish, > > > > Two more questions for you, > > What all in-memory store did you evaluate? Are they YARN compatible? > > > > Thank you for your contribution. > > > > Sandesh > > > > On Wed, Dec 2, 2015 at 10:53 AM Gaurav Gupta <[email protected]> > > wrote: > > > > > Ashish, > > > > > > I have couple of questions > > > 1. Are there standard APIs for distributed In-Memory stores or is this > > > implementation specific to one particular tool? > > > 2. Will In-Memory Store compete with DataTorrent Apps for cluster > > > resources (memory/cpu)? > > > 3. What is the purging policy? Who is responsible for cleaning up the > > > resources for completed/failed/aborted applications? This becomes > > important > > > when you want to launch an Application using previous Application Id > > > > > > Thanks > > > - Gaurav > > > > > > > On Dec 2, 2015, at 10:07 AM, Ashish Tadose <[email protected]> > > > wrote: > > > > > > > > Thanks Gaurav, > > > > > > > > I have finished baseline implementations of StorageAgent and also > > tested > > > it > > > > with demo applications by explicitly specifying it in DAG > configuration > > > as > > > > below and it works fine. > > > > > > > > dag.setAttribute(OperatorContext.STORAGE_AGENT, agent); > > > > > > > > I also had to make some changes to StramClient to pass additional > > > > information such as applicationId as it doesn't passes currently. > > > > > > > > I am going to create JIRA task for this feature and will document > > design > > > & > > > > implementation strategy there. > > > > > > > > Thx, > > > > Asish > > > > > > > > > > > > On Wed, Dec 2, 2015 at 11:26 PM, Gaurav Gupta < > [email protected]> > > > > wrote: > > > > > > > >> Just to add you can plugin your storage agent using attribute > > > >> STORAGE_AGENT ( > > > >> > > > > > > https://www.datatorrent.com/docs/apidocs/com/datatorrent/api/Context.OperatorContext.html#STORAGE_AGENT > > > >> ) > > > >> > > > >> Thanks > > > >> - Gaurav > > > >> > > > >>> On Dec 2, 2015, at 9:51 AM, Gaurav Gupta <[email protected]> > > > wrote: > > > >>> > > > >>> Ashish, > > > >>> > > > >>> You are right that Exactly once semantics can’t be achieved through > > > >> Async FS write. > > > >>> Did you try new StorageAgent with your Application? If yes do you > > have > > > >> any numbers to compare? > > > >>> > > > >>> Thanks > > > >>> - Gaurav > > > >>> > > > >>>> On Dec 2, 2015, at 9:33 AM, Ashish Tadose <[email protected] > > > >> <mailto:[email protected]>> wrote: > > > >>>> > > > >>>> Application uses large number of in-memory dimension store > > partitions > > > to > > > >>>> hold high cardinally aggregated data and also many intermediate > > > >> operators > > > >>>> keep cache data for reference look ups which are not-transient. > > > >>>> > > > >>>> Total application partitions were more than 1000 which makes lot > of > > > >>>> operator to checkpoint and in term lot of frequent Hdfs write, > > rename > > > & > > > >>>> delete operations which became bottleneck. > > > >>>> > > > >>>> Application requires Exactly once semantics with idempotent > > operators > > > >> which > > > >>>> I suppose can not be achieved through Async fs writes, please > > correct > > > >> me If > > > >>>> I'm wrong here. > > > >>>> > > > >>>> Also application computes streaming aggregations of high > cardinality > > > >>>> incoming data streams and reference caches are update frequently > so > > > not > > > >>>> sure how much incremental checkpointing will help here. > > > >>>> > > > >>>> Despite this specific application I strongly think it would be > good > > to > > > >> have > > > >>>> StorageAgent backed by distributed in-memory store as alternative > in > > > >>>> platform. > > > >>>> > > > >>>> Ashish > > > >>>> > > > >>>> > > > >>>> > > > >>>> On Wed, Dec 2, 2015 at 10:35 PM, Munagala Ramanath < > > > [email protected] > > > >> <mailto:[email protected]>> > > > >>>> wrote: > > > >>>> > > > >>>>> Ashish, > > > >>>>> > > > >>>>> In the current release, the HDFS writes are asynchronous so I'm > > > >> wondering > > > >>>>> if > > > >>>>> you could elaborate on how much latency you are observing both > with > > > and > > > >>>>> without > > > >>>>> checkpointing (i.e. after your changes to make operators > > stateless). > > > >>>>> > > > >>>>> Also any information on how much non-transient data is being > > > >> checkpointed > > > >>>>> in > > > >>>>> each operator would also be useful. There is an effort under way > to > > > >>>>> implement > > > >>>>> incremental checkpointing which should improve things when there > > is a > > > >> lot > > > >>>>> state > > > >>>>> but very little that changes from window to window. > > > >>>>> > > > >>>>> Ram > > > >>>>> > > > >>>>> > > > >>>>> On Wed, Dec 2, 2015 at 8:51 AM, Ashish Tadose < > > > [email protected] > > > >> <mailto:[email protected]>> > > > >>>>> wrote: > > > >>>>> > > > >>>>>> Hi All, > > > >>>>>> > > > >>>>>> Currently Apex engine provides operator checkpointing in Hdfs ( > > with > > > >> Hdfs > > > >>>>>> backed StorageAgents i.e. FSStorageAgent & AsyncFSStorageAgent ) > > > >>>>>> > > > >>>>>> We have observed that for applications having large number of > > > operator > > > >>>>>> instances, hdfs checkpointing introduces latency in DAG which > > > degrades > > > >>>>>> overall application performance. > > > >>>>>> To resolve this we had to review all operators in DAG and had to > > > make > > > >> few > > > >>>>>> operators stateless. > > > >>>>>> > > > >>>>>> As operator check-pointing is critical functionality of Apex > > > streaming > > > >>>>>> platform to ensure fault tolerant behavior, platform should also > > > >> provide > > > >>>>>> alternate StorageAgents which will work seamlessly with large > > > >>>>> applications > > > >>>>>> that requires Exactly once semantics. > > > >>>>>> > > > >>>>>> HDFS read/write latency is limited and doesn't improve beyond > > > certain > > > >>>>> point > > > >>>>>> because of disk io & staging writes. Having alternate strategy > to > > > this > > > >>>>>> check-pointing in fault tolerant distributed in-memory grid > would > > > >> ensure > > > >>>>>> application stability and performance is not impacted. > > > >>>>>> > > > >>>>>> I have developed a in-memory storage agent which I would like to > > > >>>>> contribute > > > >>>>>> as alternate StorageAgent for checkpointing. > > > >>>>>> > > > >>>>>> Thanks, > > > >>>>>> Ashish > > > >>>>>> > > > >>>>> > > > >>> > > > >> > > > >> > > > > > > > > >
