That would be part of the Beam model and is currently unimplemented. Progress is tracked here: https://issues.apache.org/jira/browse/BEAM-25
On Wed, Jun 22, 2016 at 12:09 PM, amir bahmanyari <[email protected]> wrote: > Thanks Lukaz & Aljoscha. > Below is a little old link July 2015. Has there been any progress on > out-of-core state? > Seems like a pretty compelling supported in-memory state management. > > https://cwiki.apache.org/confluence/display/FLINK/Stateful+Stream+Processing > > Is this specific to "Flink" (Runner)? Or Beam supports this regardless of > the Runner? i.e. We can achieve the same in Spark Runner as well. > > Thanks+regards > Amir- > > ------------------------------ > *From:* Aljoscha Krettek <[email protected]> > *To:* [email protected]; amir bahmanyari <[email protected]> > > *Sent:* Wednesday, June 22, 2016 2:18 AM > > *Subject:* Re: Multi-threading implementation equivalence in Beam > > Lukasz is of course correct in assuming that Flink does nothing to > synchronize accesses to Redis (or any other external system, for that > matter). > > On Wed, 22 Jun 2016 at 01:15 Lukasz Cwik <[email protected]> wrote: > > I can't imagine that Flink would synchronize writes to the Redis cluster > in some way such that two competing writes don't impact each other but I > would need to defer to Flink folks to answer that. > > For example if you wrote this code inside your DoFns: > processElement(...) { > string value = read from redis > value += "a"; > write to redis (value) > } > > and was processing 100,000 elements that value would be a string 100,000 > characters longs. > For example, processing a bundle may fail and you could get a string > greater than 100,000 characters long. > Or you could process in parallel where both DoFn's read "aaa" and write > back "aaaa" and then you missed appending an "a". > > I could imagine that if you are very careful and use > append/increment/check and set style operations you could maintain > consistency but if a bundle fails those affects would be applied multiple > times. > > > > On Tue, Jun 21, 2016 at 12:38 PM, amir bahmanyari <[email protected]> > wrote: > > Thanks Lukas. > I am executing my fat jar Beam app in a Flink Cluster (2-nodes for now). > I assume the Job Manager<--->Task manager(s) provide visibility to the > in-memory db contents to all (ParDo) processes running on both nodes > executing separate DoFn at the same time. > Therefore, the "shared data" are synchronized/locked while one node > process is making changes to it. > I use one instance of Redis for one set of data (accessed by both nodes > DoFn processes) & a concurrentHashMap for another set of data > I assume FlinkCluster maintains the thread safety of Redis & > concurrentHashMap objects. > Is this the right assumption? . > Thanks again. > Amir- > > > ------------------------------ > *From:* Lukasz Cwik <[email protected]> > *To:* [email protected]; amir bahmanyari <[email protected]> > > *Sent:* Monday, June 20, 2016 4:10 PM > > *Subject:* Re: Multi-threading implementation equivalence in Beam > > You are correct, an in memory database is outside the scope/knowledge of > the runner and it will not be able to move any side effects over. For > example, lets say your processing some elements X, Y, Z in one bundle on > one machine and processing Y fails. The bundle may be retried on another > machine where your changes to X may not exist. Or the bundle may be split > such that X and Z is processed on one machine and Y on yet another machine. > > If the reason for using an in-memory database is just caching and you can > reload/recreate the cached entries than this should be fine, you'll just > suffer cache misses elsewhere. > If the reason is for caching previously seen elements which you will > output later or write like side effects than this can disappear if the > bundle processing is moved to another machine. > > Its not that in memory databases can't be used, they just can't be relied > on persistent state. > > On Mon, Jun 20, 2016 at 3:36 PM, amir bahmanyari <[email protected]> > wrote: > > Wonderful. Thanks Lukaz. > Have one question. The statement in that page "In addition, *your DoFn > should not rely on any persistent state from invocation to invocation.*". > I am using in-memory db such as Redis or Aerospike for intermediate look > ups etc. > Is this what the above statement referring to: dont use in-memory dbs? > Thanks again. > > > ------------------------------ > *From:* Lukasz Cwik <[email protected]> > *To:* [email protected]; amir bahmanyari <[email protected]> > > *Sent:* Monday, June 20, 2016 1:08 PM > > *Subject:* Re: Multi-threading implementation equivalence in Beam > > Threading/parallelism is up to the runner and does not map 1-1 to the java > memory model since most runners will execute in a distributed manner. In > general, runners will attempt to break up the work as evenly as possible > and schedule work across multiple machines / cpu cores at all times to > maximize the throughput / minimize time for execution of the job This is > abstracted away much by getting users to write DoFns that apply with ParDo. > Please take a look at this explanation about ParDo ( > https://cloud.google.com/dataflow/model/par-do) to get a better > understanding of its usage and as a place to look at some examples. > > On Mon, Jun 20, 2016 at 12:44 PM, amir bahmanyari <[email protected]> > wrote: > > Thanks JB. > I am executing FlinkPipelineRunner...& later will experirnt the same with > SparkRunner....any examples pls? > Cheers > > ------------------------------ > *From:* Jean-Baptiste Onofré <[email protected]> > *To:* [email protected] > *Sent:* Monday, June 20, 2016 12:35 PM > *Subject:* Re: Multi-threading implementation equivalence in Beam > > Hi Amir, > > the DirectPipelineRunner uses multi-thread to achieve ParDo execution > for instance. > > You mean example of Beam pipeline code ? Or example of runner ? > > Regards > JB > > On 06/20/2016 09:25 PM, amir bahmanyari wrote: > > Hi Colleagues, > > Hope you all had a great weekend. Another novice question :-) > > Is there a pipeline parallelism/threading model provide by Beam that > > equates the multi-threading model in Java for instance? > > Any examples if so? > > Thanks again, > > Amir > > > -- > Jean-Baptiste Onofré > [email protected] > http://blog.nanthrax.net > Talend - http://www.talend.com > > > > > > > > > > > > >
