Gaurav, Sandesh

PFB my comments in *bold*

1. Are there standard APIs for distributed In-Memory stores or is this
implementation specific to one particular tool?
*I have developed concrete implementation with Apache Geode -
http://geode.incubator.apache.org/ <http://geode.incubator.apache.org/>*
*However for this feature contribution I am adding KeyValue store interface
and abstract implementation to plug in any KeyValue store as storage
agent. *

2. Will In-Memory Store compete with DataTorrent Apps for cluster resources
(memory/cpu)?
*Probable not, In-memory store would be separate managed cluster which may
not part of yarn env. *

3. What is the purging policy? Who is responsible for cleaning up the
resources for completed/failed/aborted applications? This becomes important
when you want to launch an Application using previous Application Id
*In-memory storage would support delete checkpoint which platform calls
periodically d**uring application lifetime. *
*Purging the checkpoints of older applications will be taken care by
application developer or admin who is managing the in-memory cluster, same
is the case with HDFS storage agents where user have to manually delete old
apps and checkpoints data.*

4 What all in-memory store did you evaluate?  Are they YARN compatible?
*I have concrete implementation of Geode storage agent which I would be
contributing along with this feature.*

Thanks,
Ashish


On Fri, Dec 4, 2015 at 12:45 AM, Sandesh Hegde <[email protected]>
wrote:

> Ashish,
>
> Two more questions for you,
> What all in-memory store did you evaluate?  Are they YARN compatible?
>
> Thank you for your contribution.
>
> Sandesh
>
> On Wed, Dec 2, 2015 at 10:53 AM Gaurav Gupta <[email protected]>
> wrote:
>
> > Ashish,
> >
> > I have couple of questions
> > 1. Are there standard APIs for distributed In-Memory stores or is this
> > implementation specific to one particular tool?
> > 2. Will In-Memory Store compete with DataTorrent Apps for cluster
> > resources (memory/cpu)?
> > 3. What is the purging policy? Who is responsible for cleaning up the
> > resources for completed/failed/aborted applications? This becomes
> important
> > when you want to launch an Application using previous Application Id
> >
> > Thanks
> > - Gaurav
> >
> > > On Dec 2, 2015, at 10:07 AM, Ashish Tadose <[email protected]>
> > wrote:
> > >
> > > Thanks Gaurav,
> > >
> > > I have finished baseline implementations of StorageAgent and also
> tested
> > it
> > > with demo applications by explicitly specifying it in DAG configuration
> > as
> > > below and it works fine.
> > >
> > > dag.setAttribute(OperatorContext.STORAGE_AGENT, agent);
> > >
> > > I also had to make some changes to StramClient to pass additional
> > > information such as applicationId as it doesn't passes currently.
> > >
> > > I am going to create JIRA task for this feature and will document
> design
> > &
> > > implementation strategy there.
> > >
> > > Thx,
> > > Asish
> > >
> > >
> > > On Wed, Dec 2, 2015 at 11:26 PM, Gaurav Gupta <[email protected]>
> > > wrote:
> > >
> > >> Just to add you can plugin your storage agent using attribute
> > >> STORAGE_AGENT (
> > >>
> >
> https://www.datatorrent.com/docs/apidocs/com/datatorrent/api/Context.OperatorContext.html#STORAGE_AGENT
> > >> )
> > >>
> > >> Thanks
> > >> - Gaurav
> > >>
> > >>> On Dec 2, 2015, at 9:51 AM, Gaurav Gupta <[email protected]>
> > wrote:
> > >>>
> > >>> Ashish,
> > >>>
> > >>> You are right that Exactly once semantics can’t be achieved through
> > >> Async FS write.
> > >>> Did you try new StorageAgent with your Application? If yes do you
> have
> > >> any numbers to compare?
> > >>>
> > >>> Thanks
> > >>> - Gaurav
> > >>>
> > >>>> On Dec 2, 2015, at 9:33 AM, Ashish Tadose <[email protected]
> > >> <mailto:[email protected]>> wrote:
> > >>>>
> > >>>> Application uses large number of in-memory dimension store
> partitions
> > to
> > >>>> hold high cardinally aggregated data and also many intermediate
> > >> operators
> > >>>> keep cache data for reference look ups which are not-transient.
> > >>>>
> > >>>> Total application partitions were more than 1000 which makes lot of
> > >>>> operator to checkpoint and in term lot of frequent Hdfs write,
> rename
> > &
> > >>>> delete operations which became bottleneck.
> > >>>>
> > >>>> Application requires Exactly once semantics with idempotent
> operators
> > >> which
> > >>>> I suppose can not be achieved through Async fs writes, please
> correct
> > >> me If
> > >>>> I'm wrong here.
> > >>>>
> > >>>> Also application computes streaming aggregations of high cardinality
> > >>>> incoming data streams and reference caches are update frequently so
> > not
> > >>>> sure how much incremental checkpointing will help here.
> > >>>>
> > >>>> Despite this specific application I strongly think it would be good
> to
> > >> have
> > >>>> StorageAgent backed by distributed in-memory store as alternative in
> > >>>> platform.
> > >>>>
> > >>>> Ashish
> > >>>>
> > >>>>
> > >>>>
> > >>>> On Wed, Dec 2, 2015 at 10:35 PM, Munagala Ramanath <
> > [email protected]
> > >> <mailto:[email protected]>>
> > >>>> wrote:
> > >>>>
> > >>>>> Ashish,
> > >>>>>
> > >>>>> In the current release, the HDFS writes are asynchronous so I'm
> > >> wondering
> > >>>>> if
> > >>>>> you could elaborate on how much latency you are observing both with
> > and
> > >>>>> without
> > >>>>> checkpointing (i.e. after your changes to make operators
> stateless).
> > >>>>>
> > >>>>> Also any information on how much non-transient data is being
> > >> checkpointed
> > >>>>> in
> > >>>>> each operator would also be useful. There is an effort under way to
> > >>>>> implement
> > >>>>> incremental checkpointing which should improve things when there
> is a
> > >> lot
> > >>>>> state
> > >>>>> but very little that changes from window to window.
> > >>>>>
> > >>>>> Ram
> > >>>>>
> > >>>>>
> > >>>>> On Wed, Dec 2, 2015 at 8:51 AM, Ashish Tadose <
> > [email protected]
> > >> <mailto:[email protected]>>
> > >>>>> wrote:
> > >>>>>
> > >>>>>> Hi All,
> > >>>>>>
> > >>>>>> Currently Apex engine provides operator checkpointing in Hdfs (
> with
> > >> Hdfs
> > >>>>>> backed StorageAgents i.e. FSStorageAgent & AsyncFSStorageAgent )
> > >>>>>>
> > >>>>>> We have observed that for applications having large number of
> > operator
> > >>>>>> instances, hdfs checkpointing introduces latency in DAG which
> > degrades
> > >>>>>> overall application performance.
> > >>>>>> To resolve this we had to review all operators in DAG and had to
> > make
> > >> few
> > >>>>>> operators stateless.
> > >>>>>>
> > >>>>>> As operator check-pointing is critical functionality of Apex
> > streaming
> > >>>>>> platform to ensure fault tolerant behavior, platform should also
> > >> provide
> > >>>>>> alternate StorageAgents which will work seamlessly with large
> > >>>>> applications
> > >>>>>> that requires Exactly once semantics.
> > >>>>>>
> > >>>>>> HDFS read/write latency is limited and doesn't improve beyond
> > certain
> > >>>>> point
> > >>>>>> because of disk io & staging writes. Having alternate strategy to
> > this
> > >>>>>> check-pointing in fault tolerant distributed in-memory grid would
> > >> ensure
> > >>>>>> application stability and performance is not impacted.
> > >>>>>>
> > >>>>>> I have developed a in-memory storage agent which I would like to
> > >>>>> contribute
> > >>>>>> as alternate StorageAgent for checkpointing.
> > >>>>>>
> > >>>>>> Thanks,
> > >>>>>> Ashish
> > >>>>>>
> > >>>>>
> > >>>
> > >>
> > >>
> >
> >
>

Reply via email to