Hi Thomas, I have created below JIRA and added pull request for code review. https://issues.apache.org/jira/browse/APEXCORE-283
We have also developed input & output operators for Geode which we will contributing soon. Thx, Ashish On Sat, Dec 5, 2015 at 12:02 AM, Thomas Weise <[email protected]> wrote: > Hi Ashish, > > Nice, looking forward to see the support for Geode! Have you created any > JIRAs for your work yet? > > Just in case you have not seen it yet, here are the contributor guidelines: > http://apex.incubator.apache.org/contributing.html > > Thanks, > Thomas > > On Fri, Dec 4, 2015 at 9:49 AM, Ashish Tadose <[email protected]> > wrote: > > > Gaurav, Sandesh > > > > PFB my comments in *bold* > > > > 1. Are there standard APIs for distributed In-Memory stores or is this > > implementation specific to one particular tool? > > *I have developed concrete implementation with Apache Geode - > > http://geode.incubator.apache.org/ <http://geode.incubator.apache.org/>* > > *However for this feature contribution I am adding KeyValue store > interface > > and abstract implementation to plug in any KeyValue store as storage > > agent. * > > > > 2. Will In-Memory Store compete with DataTorrent Apps for cluster > resources > > (memory/cpu)? > > *Probable not, In-memory store would be separate managed cluster which > may > > not part of yarn env. * > > > > 3. What is the purging policy? Who is responsible for cleaning up the > > resources for completed/failed/aborted applications? This becomes > important > > when you want to launch an Application using previous Application Id > > *In-memory storage would support delete checkpoint which platform calls > > periodically d**uring application lifetime. * > > *Purging the checkpoints of older applications will be taken care by > > application developer or admin who is managing the in-memory cluster, > same > > is the case with HDFS storage agents where user have to manually delete > old > > apps and checkpoints data.* > > > > 4 What all in-memory store did you evaluate? Are they YARN compatible? > > *I have concrete implementation of Geode storage agent which I would be > > contributing along with this feature.* > > > > Thanks, > > Ashish > > > > > > On Fri, Dec 4, 2015 at 12:45 AM, Sandesh Hegde <[email protected]> > > wrote: > > > > > Ashish, > > > > > > Two more questions for you, > > > What all in-memory store did you evaluate? Are they YARN compatible? > > > > > > Thank you for your contribution. > > > > > > Sandesh > > > > > > On Wed, Dec 2, 2015 at 10:53 AM Gaurav Gupta <[email protected]> > > > wrote: > > > > > > > Ashish, > > > > > > > > I have couple of questions > > > > 1. Are there standard APIs for distributed In-Memory stores or is > this > > > > implementation specific to one particular tool? > > > > 2. Will In-Memory Store compete with DataTorrent Apps for cluster > > > > resources (memory/cpu)? > > > > 3. What is the purging policy? Who is responsible for cleaning up the > > > > resources for completed/failed/aborted applications? This becomes > > > important > > > > when you want to launch an Application using previous Application Id > > > > > > > > Thanks > > > > - Gaurav > > > > > > > > > On Dec 2, 2015, at 10:07 AM, Ashish Tadose <[email protected] > > > > > > wrote: > > > > > > > > > > Thanks Gaurav, > > > > > > > > > > I have finished baseline implementations of StorageAgent and also > > > tested > > > > it > > > > > with demo applications by explicitly specifying it in DAG > > configuration > > > > as > > > > > below and it works fine. > > > > > > > > > > dag.setAttribute(OperatorContext.STORAGE_AGENT, agent); > > > > > > > > > > I also had to make some changes to StramClient to pass additional > > > > > information such as applicationId as it doesn't passes currently. > > > > > > > > > > I am going to create JIRA task for this feature and will document > > > design > > > > & > > > > > implementation strategy there. > > > > > > > > > > Thx, > > > > > Asish > > > > > > > > > > > > > > > On Wed, Dec 2, 2015 at 11:26 PM, Gaurav Gupta < > > [email protected]> > > > > > wrote: > > > > > > > > > >> Just to add you can plugin your storage agent using attribute > > > > >> STORAGE_AGENT ( > > > > >> > > > > > > > > > > https://www.datatorrent.com/docs/apidocs/com/datatorrent/api/Context.OperatorContext.html#STORAGE_AGENT > > > > >> ) > > > > >> > > > > >> Thanks > > > > >> - Gaurav > > > > >> > > > > >>> On Dec 2, 2015, at 9:51 AM, Gaurav Gupta <[email protected] > > > > > > wrote: > > > > >>> > > > > >>> Ashish, > > > > >>> > > > > >>> You are right that Exactly once semantics can’t be achieved > through > > > > >> Async FS write. > > > > >>> Did you try new StorageAgent with your Application? If yes do you > > > have > > > > >> any numbers to compare? > > > > >>> > > > > >>> Thanks > > > > >>> - Gaurav > > > > >>> > > > > >>>> On Dec 2, 2015, at 9:33 AM, Ashish Tadose < > [email protected] > > > > >> <mailto:[email protected]>> wrote: > > > > >>>> > > > > >>>> Application uses large number of in-memory dimension store > > > partitions > > > > to > > > > >>>> hold high cardinally aggregated data and also many intermediate > > > > >> operators > > > > >>>> keep cache data for reference look ups which are not-transient. > > > > >>>> > > > > >>>> Total application partitions were more than 1000 which makes lot > > of > > > > >>>> operator to checkpoint and in term lot of frequent Hdfs write, > > > rename > > > > & > > > > >>>> delete operations which became bottleneck. > > > > >>>> > > > > >>>> Application requires Exactly once semantics with idempotent > > > operators > > > > >> which > > > > >>>> I suppose can not be achieved through Async fs writes, please > > > correct > > > > >> me If > > > > >>>> I'm wrong here. > > > > >>>> > > > > >>>> Also application computes streaming aggregations of high > > cardinality > > > > >>>> incoming data streams and reference caches are update frequently > > so > > > > not > > > > >>>> sure how much incremental checkpointing will help here. > > > > >>>> > > > > >>>> Despite this specific application I strongly think it would be > > good > > > to > > > > >> have > > > > >>>> StorageAgent backed by distributed in-memory store as > alternative > > in > > > > >>>> platform. > > > > >>>> > > > > >>>> Ashish > > > > >>>> > > > > >>>> > > > > >>>> > > > > >>>> On Wed, Dec 2, 2015 at 10:35 PM, Munagala Ramanath < > > > > [email protected] > > > > >> <mailto:[email protected]>> > > > > >>>> wrote: > > > > >>>> > > > > >>>>> Ashish, > > > > >>>>> > > > > >>>>> In the current release, the HDFS writes are asynchronous so I'm > > > > >> wondering > > > > >>>>> if > > > > >>>>> you could elaborate on how much latency you are observing both > > with > > > > and > > > > >>>>> without > > > > >>>>> checkpointing (i.e. after your changes to make operators > > > stateless). > > > > >>>>> > > > > >>>>> Also any information on how much non-transient data is being > > > > >> checkpointed > > > > >>>>> in > > > > >>>>> each operator would also be useful. There is an effort under > way > > to > > > > >>>>> implement > > > > >>>>> incremental checkpointing which should improve things when > there > > > is a > > > > >> lot > > > > >>>>> state > > > > >>>>> but very little that changes from window to window. > > > > >>>>> > > > > >>>>> Ram > > > > >>>>> > > > > >>>>> > > > > >>>>> On Wed, Dec 2, 2015 at 8:51 AM, Ashish Tadose < > > > > [email protected] > > > > >> <mailto:[email protected]>> > > > > >>>>> wrote: > > > > >>>>> > > > > >>>>>> Hi All, > > > > >>>>>> > > > > >>>>>> Currently Apex engine provides operator checkpointing in Hdfs > ( > > > with > > > > >> Hdfs > > > > >>>>>> backed StorageAgents i.e. FSStorageAgent & > AsyncFSStorageAgent ) > > > > >>>>>> > > > > >>>>>> We have observed that for applications having large number of > > > > operator > > > > >>>>>> instances, hdfs checkpointing introduces latency in DAG which > > > > degrades > > > > >>>>>> overall application performance. > > > > >>>>>> To resolve this we had to review all operators in DAG and had > to > > > > make > > > > >> few > > > > >>>>>> operators stateless. > > > > >>>>>> > > > > >>>>>> As operator check-pointing is critical functionality of Apex > > > > streaming > > > > >>>>>> platform to ensure fault tolerant behavior, platform should > also > > > > >> provide > > > > >>>>>> alternate StorageAgents which will work seamlessly with large > > > > >>>>> applications > > > > >>>>>> that requires Exactly once semantics. > > > > >>>>>> > > > > >>>>>> HDFS read/write latency is limited and doesn't improve beyond > > > > certain > > > > >>>>> point > > > > >>>>>> because of disk io & staging writes. Having alternate strategy > > to > > > > this > > > > >>>>>> check-pointing in fault tolerant distributed in-memory grid > > would > > > > >> ensure > > > > >>>>>> application stability and performance is not impacted. > > > > >>>>>> > > > > >>>>>> I have developed a in-memory storage agent which I would like > to > > > > >>>>> contribute > > > > >>>>>> as alternate StorageAgent for checkpointing. > > > > >>>>>> > > > > >>>>>> Thanks, > > > > >>>>>> Ashish > > > > >>>>>> > > > > >>>>> > > > > >>> > > > > >> > > > > >> > > > > > > > > > > > > > >
