Hi Thomas,

I have created below JIRA and added pull request for code review.
https://issues.apache.org/jira/browse/APEXCORE-283

We have also developed input & output operators for Geode which we will
contributing soon.

Thx,
Ashish


On Sat, Dec 5, 2015 at 12:02 AM, Thomas Weise <[email protected]>
wrote:

> Hi Ashish,
>
> Nice, looking forward to see the support for Geode! Have you created any
> JIRAs for your work yet?
>
> Just in case you have not seen it yet, here are the contributor guidelines:
> http://apex.incubator.apache.org/contributing.html
>
> Thanks,
> Thomas
>
> On Fri, Dec 4, 2015 at 9:49 AM, Ashish Tadose <[email protected]>
> wrote:
>
> > Gaurav, Sandesh
> >
> > PFB my comments in *bold*
> >
> > 1. Are there standard APIs for distributed In-Memory stores or is this
> > implementation specific to one particular tool?
> > *I have developed concrete implementation with Apache Geode -
> > http://geode.incubator.apache.org/ <http://geode.incubator.apache.org/>*
> > *However for this feature contribution I am adding KeyValue store
> interface
> > and abstract implementation to plug in any KeyValue store as storage
> > agent. *
> >
> > 2. Will In-Memory Store compete with DataTorrent Apps for cluster
> resources
> > (memory/cpu)?
> > *Probable not, In-memory store would be separate managed cluster which
> may
> > not part of yarn env. *
> >
> > 3. What is the purging policy? Who is responsible for cleaning up the
> > resources for completed/failed/aborted applications? This becomes
> important
> > when you want to launch an Application using previous Application Id
> > *In-memory storage would support delete checkpoint which platform calls
> > periodically d**uring application lifetime. *
> > *Purging the checkpoints of older applications will be taken care by
> > application developer or admin who is managing the in-memory cluster,
> same
> > is the case with HDFS storage agents where user have to manually delete
> old
> > apps and checkpoints data.*
> >
> > 4 What all in-memory store did you evaluate?  Are they YARN compatible?
> > *I have concrete implementation of Geode storage agent which I would be
> > contributing along with this feature.*
> >
> > Thanks,
> > Ashish
> >
> >
> > On Fri, Dec 4, 2015 at 12:45 AM, Sandesh Hegde <[email protected]>
> > wrote:
> >
> > > Ashish,
> > >
> > > Two more questions for you,
> > > What all in-memory store did you evaluate?  Are they YARN compatible?
> > >
> > > Thank you for your contribution.
> > >
> > > Sandesh
> > >
> > > On Wed, Dec 2, 2015 at 10:53 AM Gaurav Gupta <[email protected]>
> > > wrote:
> > >
> > > > Ashish,
> > > >
> > > > I have couple of questions
> > > > 1. Are there standard APIs for distributed In-Memory stores or is
> this
> > > > implementation specific to one particular tool?
> > > > 2. Will In-Memory Store compete with DataTorrent Apps for cluster
> > > > resources (memory/cpu)?
> > > > 3. What is the purging policy? Who is responsible for cleaning up the
> > > > resources for completed/failed/aborted applications? This becomes
> > > important
> > > > when you want to launch an Application using previous Application Id
> > > >
> > > > Thanks
> > > > - Gaurav
> > > >
> > > > > On Dec 2, 2015, at 10:07 AM, Ashish Tadose <[email protected]
> >
> > > > wrote:
> > > > >
> > > > > Thanks Gaurav,
> > > > >
> > > > > I have finished baseline implementations of StorageAgent and also
> > > tested
> > > > it
> > > > > with demo applications by explicitly specifying it in DAG
> > configuration
> > > > as
> > > > > below and it works fine.
> > > > >
> > > > > dag.setAttribute(OperatorContext.STORAGE_AGENT, agent);
> > > > >
> > > > > I also had to make some changes to StramClient to pass additional
> > > > > information such as applicationId as it doesn't passes currently.
> > > > >
> > > > > I am going to create JIRA task for this feature and will document
> > > design
> > > > &
> > > > > implementation strategy there.
> > > > >
> > > > > Thx,
> > > > > Asish
> > > > >
> > > > >
> > > > > On Wed, Dec 2, 2015 at 11:26 PM, Gaurav Gupta <
> > [email protected]>
> > > > > wrote:
> > > > >
> > > > >> Just to add you can plugin your storage agent using attribute
> > > > >> STORAGE_AGENT (
> > > > >>
> > > >
> > >
> >
> https://www.datatorrent.com/docs/apidocs/com/datatorrent/api/Context.OperatorContext.html#STORAGE_AGENT
> > > > >> )
> > > > >>
> > > > >> Thanks
> > > > >> - Gaurav
> > > > >>
> > > > >>> On Dec 2, 2015, at 9:51 AM, Gaurav Gupta <[email protected]
> >
> > > > wrote:
> > > > >>>
> > > > >>> Ashish,
> > > > >>>
> > > > >>> You are right that Exactly once semantics can’t be achieved
> through
> > > > >> Async FS write.
> > > > >>> Did you try new StorageAgent with your Application? If yes do you
> > > have
> > > > >> any numbers to compare?
> > > > >>>
> > > > >>> Thanks
> > > > >>> - Gaurav
> > > > >>>
> > > > >>>> On Dec 2, 2015, at 9:33 AM, Ashish Tadose <
> [email protected]
> > > > >> <mailto:[email protected]>> wrote:
> > > > >>>>
> > > > >>>> Application uses large number of in-memory dimension store
> > > partitions
> > > > to
> > > > >>>> hold high cardinally aggregated data and also many intermediate
> > > > >> operators
> > > > >>>> keep cache data for reference look ups which are not-transient.
> > > > >>>>
> > > > >>>> Total application partitions were more than 1000 which makes lot
> > of
> > > > >>>> operator to checkpoint and in term lot of frequent Hdfs write,
> > > rename
> > > > &
> > > > >>>> delete operations which became bottleneck.
> > > > >>>>
> > > > >>>> Application requires Exactly once semantics with idempotent
> > > operators
> > > > >> which
> > > > >>>> I suppose can not be achieved through Async fs writes, please
> > > correct
> > > > >> me If
> > > > >>>> I'm wrong here.
> > > > >>>>
> > > > >>>> Also application computes streaming aggregations of high
> > cardinality
> > > > >>>> incoming data streams and reference caches are update frequently
> > so
> > > > not
> > > > >>>> sure how much incremental checkpointing will help here.
> > > > >>>>
> > > > >>>> Despite this specific application I strongly think it would be
> > good
> > > to
> > > > >> have
> > > > >>>> StorageAgent backed by distributed in-memory store as
> alternative
> > in
> > > > >>>> platform.
> > > > >>>>
> > > > >>>> Ashish
> > > > >>>>
> > > > >>>>
> > > > >>>>
> > > > >>>> On Wed, Dec 2, 2015 at 10:35 PM, Munagala Ramanath <
> > > > [email protected]
> > > > >> <mailto:[email protected]>>
> > > > >>>> wrote:
> > > > >>>>
> > > > >>>>> Ashish,
> > > > >>>>>
> > > > >>>>> In the current release, the HDFS writes are asynchronous so I'm
> > > > >> wondering
> > > > >>>>> if
> > > > >>>>> you could elaborate on how much latency you are observing both
> > with
> > > > and
> > > > >>>>> without
> > > > >>>>> checkpointing (i.e. after your changes to make operators
> > > stateless).
> > > > >>>>>
> > > > >>>>> Also any information on how much non-transient data is being
> > > > >> checkpointed
> > > > >>>>> in
> > > > >>>>> each operator would also be useful. There is an effort under
> way
> > to
> > > > >>>>> implement
> > > > >>>>> incremental checkpointing which should improve things when
> there
> > > is a
> > > > >> lot
> > > > >>>>> state
> > > > >>>>> but very little that changes from window to window.
> > > > >>>>>
> > > > >>>>> Ram
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> On Wed, Dec 2, 2015 at 8:51 AM, Ashish Tadose <
> > > > [email protected]
> > > > >> <mailto:[email protected]>>
> > > > >>>>> wrote:
> > > > >>>>>
> > > > >>>>>> Hi All,
> > > > >>>>>>
> > > > >>>>>> Currently Apex engine provides operator checkpointing in Hdfs
> (
> > > with
> > > > >> Hdfs
> > > > >>>>>> backed StorageAgents i.e. FSStorageAgent &
> AsyncFSStorageAgent )
> > > > >>>>>>
> > > > >>>>>> We have observed that for applications having large number of
> > > > operator
> > > > >>>>>> instances, hdfs checkpointing introduces latency in DAG which
> > > > degrades
> > > > >>>>>> overall application performance.
> > > > >>>>>> To resolve this we had to review all operators in DAG and had
> to
> > > > make
> > > > >> few
> > > > >>>>>> operators stateless.
> > > > >>>>>>
> > > > >>>>>> As operator check-pointing is critical functionality of Apex
> > > > streaming
> > > > >>>>>> platform to ensure fault tolerant behavior, platform should
> also
> > > > >> provide
> > > > >>>>>> alternate StorageAgents which will work seamlessly with large
> > > > >>>>> applications
> > > > >>>>>> that requires Exactly once semantics.
> > > > >>>>>>
> > > > >>>>>> HDFS read/write latency is limited and doesn't improve beyond
> > > > certain
> > > > >>>>> point
> > > > >>>>>> because of disk io & staging writes. Having alternate strategy
> > to
> > > > this
> > > > >>>>>> check-pointing in fault tolerant distributed in-memory grid
> > would
> > > > >> ensure
> > > > >>>>>> application stability and performance is not impacted.
> > > > >>>>>>
> > > > >>>>>> I have developed a in-memory storage agent which I would like
> to
> > > > >>>>> contribute
> > > > >>>>>> as alternate StorageAgent for checkpointing.
> > > > >>>>>>
> > > > >>>>>> Thanks,
> > > > >>>>>> Ashish
> > > > >>>>>>
> > > > >>>>>
> > > > >>>
> > > > >>
> > > > >>
> > > >
> > > >
> > >
> >
>

Reply via email to