I can't imagine that Flink would synchronize writes to the Redis cluster in
some way such that two competing writes don't impact each other but I would
need to defer to Flink folks to answer that.
For example if you wrote this code inside your DoFns:
processElement(...) {
string value = read from redis
value += "a";
write to redis (value)
}
and was processing 100,000 elements that value would be a string 100,000
characters longs.
For example, processing a bundle may fail and you could get a string
greater than 100,000 characters long.
Or you could process in parallel where both DoFn's read "aaa" and write
back "aaaa" and then you missed appending an "a".
I could imagine that if you are very careful and use append/increment/check
and set style operations you could maintain consistency but if a bundle
fails those affects would be applied multiple times.
On Tue, Jun 21, 2016 at 12:38 PM, amir bahmanyari <[email protected]>
wrote:
> Thanks Lukas.
> I am executing my fat jar Beam app in a Flink Cluster (2-nodes for now).
> I assume the Job Manager<--->Task manager(s) provide visibility to the
> in-memory db contents to all (ParDo) processes running on both nodes
> executing separate DoFn at the same time.
> Therefore, the "shared data" are synchronized/locked while one node
> process is making changes to it.
> I use one instance of Redis for one set of data (accessed by both nodes
> DoFn processes) & a concurrentHashMap for another set of data
> I assume FlinkCluster maintains the thread safety of Redis &
> concurrentHashMap objects.
> Is this the right assumption? .
> Thanks again.
> Amir-
>
>
> ------------------------------
> *From:* Lukasz Cwik <[email protected]>
> *To:* [email protected]; amir bahmanyari <[email protected]>
>
> *Sent:* Monday, June 20, 2016 4:10 PM
>
> *Subject:* Re: Multi-threading implementation equivalence in Beam
>
> You are correct, an in memory database is outside the scope/knowledge of
> the runner and it will not be able to move any side effects over. For
> example, lets say your processing some elements X, Y, Z in one bundle on
> one machine and processing Y fails. The bundle may be retried on another
> machine where your changes to X may not exist. Or the bundle may be split
> such that X and Z is processed on one machine and Y on yet another machine.
>
> If the reason for using an in-memory database is just caching and you can
> reload/recreate the cached entries than this should be fine, you'll just
> suffer cache misses elsewhere.
> If the reason is for caching previously seen elements which you will
> output later or write like side effects than this can disappear if the
> bundle processing is moved to another machine.
>
> Its not that in memory databases can't be used, they just can't be relied
> on persistent state.
>
> On Mon, Jun 20, 2016 at 3:36 PM, amir bahmanyari <[email protected]>
> wrote:
>
> Wonderful. Thanks Lukaz.
> Have one question. The statement in that page "In addition, *your DoFn
> should not rely on any persistent state from invocation to invocation.*".
> I am using in-memory db such as Redis or Aerospike for intermediate look
> ups etc.
> Is this what the above statement referring to: dont use in-memory dbs?
> Thanks again.
>
>
> ------------------------------
> *From:* Lukasz Cwik <[email protected]>
> *To:* [email protected]; amir bahmanyari <[email protected]>
>
> *Sent:* Monday, June 20, 2016 1:08 PM
>
> *Subject:* Re: Multi-threading implementation equivalence in Beam
>
> Threading/parallelism is up to the runner and does not map 1-1 to the java
> memory model since most runners will execute in a distributed manner. In
> general, runners will attempt to break up the work as evenly as possible
> and schedule work across multiple machines / cpu cores at all times to
> maximize the throughput / minimize time for execution of the job This is
> abstracted away much by getting users to write DoFns that apply with ParDo.
> Please take a look at this explanation about ParDo (
> https://cloud.google.com/dataflow/model/par-do) to get a better
> understanding of its usage and as a place to look at some examples.
>
> On Mon, Jun 20, 2016 at 12:44 PM, amir bahmanyari <[email protected]>
> wrote:
>
> Thanks JB.
> I am executing FlinkPipelineRunner...& later will experirnt the same with
> SparkRunner....any examples pls?
> Cheers
>
> ------------------------------
> *From:* Jean-Baptiste Onofré <[email protected]>
> *To:* [email protected]
> *Sent:* Monday, June 20, 2016 12:35 PM
> *Subject:* Re: Multi-threading implementation equivalence in Beam
>
> Hi Amir,
>
> the DirectPipelineRunner uses multi-thread to achieve ParDo execution
> for instance.
>
> You mean example of Beam pipeline code ? Or example of runner ?
>
> Regards
> JB
>
> On 06/20/2016 09:25 PM, amir bahmanyari wrote:
> > Hi Colleagues,
> > Hope you all had a great weekend. Another novice question :-)
> > Is there a pipeline parallelism/threading model provide by Beam that
> > equates the multi-threading model in Java for instance?
> > Any examples if so?
> > Thanks again,
> > Amir
>
>
> --
> Jean-Baptiste Onofré
> [email protected]
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
>
>
>
>
>
>
>
>
>