Thanks Gaurav, I have finished baseline implementations of StorageAgent and also tested it with demo applications by explicitly specifying it in DAG configuration as below and it works fine.
dag.setAttribute(OperatorContext.STORAGE_AGENT, agent); I also had to make some changes to StramClient to pass additional information such as applicationId as it doesn't passes currently. I am going to create JIRA task for this feature and will document design & implementation strategy there. Thx, Asish On Wed, Dec 2, 2015 at 11:26 PM, Gaurav Gupta <[email protected]> wrote: > Just to add you can plugin your storage agent using attribute > STORAGE_AGENT ( > https://www.datatorrent.com/docs/apidocs/com/datatorrent/api/Context.OperatorContext.html#STORAGE_AGENT > ) > > Thanks > - Gaurav > > > On Dec 2, 2015, at 9:51 AM, Gaurav Gupta <[email protected]> wrote: > > > > Ashish, > > > > You are right that Exactly once semantics can’t be achieved through > Async FS write. > > Did you try new StorageAgent with your Application? If yes do you have > any numbers to compare? > > > > Thanks > > - Gaurav > > > >> On Dec 2, 2015, at 9:33 AM, Ashish Tadose <[email protected] > <mailto:[email protected]>> wrote: > >> > >> Application uses large number of in-memory dimension store partitions to > >> hold high cardinally aggregated data and also many intermediate > operators > >> keep cache data for reference look ups which are not-transient. > >> > >> Total application partitions were more than 1000 which makes lot of > >> operator to checkpoint and in term lot of frequent Hdfs write, rename & > >> delete operations which became bottleneck. > >> > >> Application requires Exactly once semantics with idempotent operators > which > >> I suppose can not be achieved through Async fs writes, please correct > me If > >> I'm wrong here. > >> > >> Also application computes streaming aggregations of high cardinality > >> incoming data streams and reference caches are update frequently so not > >> sure how much incremental checkpointing will help here. > >> > >> Despite this specific application I strongly think it would be good to > have > >> StorageAgent backed by distributed in-memory store as alternative in > >> platform. > >> > >> Ashish > >> > >> > >> > >> On Wed, Dec 2, 2015 at 10:35 PM, Munagala Ramanath <[email protected] > <mailto:[email protected]>> > >> wrote: > >> > >>> Ashish, > >>> > >>> In the current release, the HDFS writes are asynchronous so I'm > wondering > >>> if > >>> you could elaborate on how much latency you are observing both with and > >>> without > >>> checkpointing (i.e. after your changes to make operators stateless). > >>> > >>> Also any information on how much non-transient data is being > checkpointed > >>> in > >>> each operator would also be useful. There is an effort under way to > >>> implement > >>> incremental checkpointing which should improve things when there is a > lot > >>> state > >>> but very little that changes from window to window. > >>> > >>> Ram > >>> > >>> > >>> On Wed, Dec 2, 2015 at 8:51 AM, Ashish Tadose <[email protected] > <mailto:[email protected]>> > >>> wrote: > >>> > >>>> Hi All, > >>>> > >>>> Currently Apex engine provides operator checkpointing in Hdfs ( with > Hdfs > >>>> backed StorageAgents i.e. FSStorageAgent & AsyncFSStorageAgent ) > >>>> > >>>> We have observed that for applications having large number of operator > >>>> instances, hdfs checkpointing introduces latency in DAG which degrades > >>>> overall application performance. > >>>> To resolve this we had to review all operators in DAG and had to make > few > >>>> operators stateless. > >>>> > >>>> As operator check-pointing is critical functionality of Apex streaming > >>>> platform to ensure fault tolerant behavior, platform should also > provide > >>>> alternate StorageAgents which will work seamlessly with large > >>> applications > >>>> that requires Exactly once semantics. > >>>> > >>>> HDFS read/write latency is limited and doesn't improve beyond certain > >>> point > >>>> because of disk io & staging writes. Having alternate strategy to this > >>>> check-pointing in fault tolerant distributed in-memory grid would > ensure > >>>> application stability and performance is not impacted. > >>>> > >>>> I have developed a in-memory storage agent which I would like to > >>> contribute > >>>> as alternate StorageAgent for checkpointing. > >>>> > >>>> Thanks, > >>>> Ashish > >>>> > >>> > > > >
