Hi Amir, There seems to be a bit of a rathole here that is hard to talk through over email.
Are you willing to share a runnable test? Ideally you would send a tarball or post a runnable pipeline on GitHub along with clear instructions to reproduce. Assume we can set up our own local Kafka node. Dan On Tue, Aug 9, 2016 at 9:24 PM, amir bahmanyari <[email protected]> wrote: > Hi Thomas, > I removed the KafkaIO() call, and replaced it with TextIO() reading data > records from file system. > *Works perfect* :-( Not sure to be happy or sad...all this time I proved > kafka itself was not sending duplicate records. > But it seems like KafkaIO() has the brain of its own. > > Bottom-line: The difference is KafkaIO().....Its probably intermittently > sending duplicates which I could not catch during my testing. > > Anyone can suggest a way to prevent KafkaIO() from re-sending to > processElement() pls? > Thanks. > > p.apply(TextIO.Read.from("/tmp/10m1x1K.dat")) > .apply("PseudoLRDoFn", ParDo.of(new DoFn<String, String>() { > > ------------------------------ > *From:* Thomas Groh <[email protected]> > *To:* [email protected]; amir bahmanyari <[email protected]> > > *Sent:* Tuesday, August 9, 2016 1:22 PM > *Subject:* Re: Is Beam pipeline runtime behavior inconsistent? > > ConcurrentHashMaps can be interacted with in a way that does not preserve > the intended semantics. If you are using exclusively atomic mutation > operations (putIfAbsent(K, V), remove(K, V), replace(K, V, V)), you can > ensure that the mutation semantics are obtained; however, using a > ConcurrentMap purely like a map can cause Time-of-check-time-of-use errors. > Otherwise, ConcurrentMaps provide happens-before and visibility guarantees > only. > > For the second question, this is mainly about interacting with mutable > per-element state - if you interact with, for example, mutable instance > fields that have a base and a current state, the base state must be reset > per-element. It doesn't sound like this is your problem. > > On Tue, Aug 9, 2016 at 11:31 AM, amir bahmanyari <[email protected]> > wrote: > > Hi Thomas, > I spent time to digest all of this. I think I understand it to a good > extent. > The only hang up I still have is controlling the execution trajectory with > persisting state which you say its not guaranteed in Beam. > Have some further questions* Q* below & appreciate your valuable time to > respond to them. I reiterated your statements in " " for quick reference > above them. > > "We do not encourage sharing objects between DoFn instances, and any > shared state must be accessed in a thread-safe manner, and modifications to > shared state should be idempotent, as otherwise retries and speculative > execution may cause that state to be inconsistent." > *Q*: I persisted state in (single instance) Redis. I got varying result > at each run. > I then replaced Redis with java (static) ConcurrentHashMaps which are > automatically thread safe. Interesting enough, the very first run after > this change produced precise result & I thought I GOT IT! Re-run, and I got > varying results again till this moment I am typing this email. How would > you suggest to "any shared state must be accessed in a thread-safe manner" > different than using Concurrent HashMaps? > > > "A DoFn will be reused for multiple elements across a single bundle, and > may be reused across multiple bundles - if you require the DoFn to be > "fresh" per element, it should perform any required setup at the start of > the ProcessElement method." > *Q*: What do you suggest to "it should perform any required setup at the > start of the ProcessElement method."? > I can think of persisting the DoFn Obj's HashCode at the Object class > level (every-time ProcessElement is invoked) & compare it later on for > uniqueness with Object's equals(Obj). It gets a little hairy when > "parallelism" manifests in execution I know. > I appreciate your suggestions. > > > Thanks+have a great day. > Amir > > > ------------------------------ > *From:* Thomas Groh <[email protected]> > *To:* [email protected] ; amir bahmanyari < > [email protected]> > *Sent:* Monday, August 8, 2016 1:44 PM > *Subject:* Re: Is Beam pipeline runtime behavior inconsistent? > > There's no way to guarantee that exactly one record is processed at a > time. This is part of the design of ParDo to work efficiently across > multiple processes and machines[1], where multiple instances of a DoFn must > exist in order for progress to be made in a timely fashion. This includes > processing the same element across multiple machines at the same time, with > only one of the results being available in the output PCollection, as well > as retries of failed elements. > > A runner is required to interact with a DoFn instance in a single-threaded > manner - however, it is permitted to have multiple different DoFn instances > active within a single process and across processes at any given time (for > the same reasons as above). There's no support in the Beam model to > restrict this type of execution. We do not encourage sharing objects > between DoFn instances, and any shared state must be accessed in a > thread-safe manner, and modifications to shared state should be idempotent, > as otherwise retries and speculative execution may cause that state to be > inconsistent. A DoFn will be reused for multiple elements across a single > bundle, and may be reused across multiple bundles - if you require the DoFn > to be "fresh" per element, it should perform any required setup at the > start of the ProcessElement method. > > The best that can be done if it is absolutely required to restrict > processing to a single element at a time would be to group all of the > elements to a single key. Note that this will not solve the problem in all > cases, as a runner is permitted to execute the group of elements multiple > times so long as it only takes one completed bundle as the result, and > additionally this removes the ability of the runner to balance work and > introduces a performance bottleneck. To do so, you would key the inputs to > a single static key and apply a GroupByKey, running the processing method > on the output Iterable produced by the GroupByKey (directly; expanding the > input iterable in a separate PCollection allows a runner to rebalance the > elements, which will reintroduce parallelism)`. > > [1] https://github.com/apache/ incubator-beam/blob/master/ > sdks/java/core/src/main/java/ org/apache/beam/sdk/ > transforms/ParDo.java#L360 > <https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java#L360> > > On Mon, Aug 8, 2016 at 12:46 PM, amir bahmanyari <[email protected]> > wrote: > > Hi Thomas, > Thanks so much for your response. Here are answers to your questions. > You have a specific collection of records stored in Kafka. You run your > pipeline, and observe duplicate elements. Is that accurate? > > ==>> I send records to Kafka from my laptop. I use KafkaIO() to receive > the records. I have confirmed that I dont get duplicates from Kafka. > However, > for some reason, certain parts of my code execute beyond the actual number > of expected number of records, and subsequently produce extra resulting > data. > I tried playing with the Triggering. Stretching the window interval, > DiscardingFiredPanes etc. all kinds of modes. > Same. How can I guarantee that one record at a time executes in one > unique instance of the inner class object? > I have all the shared objects synchronized and am using Java concurrent > hashmaps. How can I guarantee synchronized operations amongst "parallel > pipelines"? Analogous to multiple threads accessing a shared object and > trying to modify it... > > Here is my current KafkaIO() call: > PCollection<String> kafkarecords = p.apply(KafkaIO.read(). > withBootstrapServers(" kafkahost:9092").withTopics( topics). > withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply( > Values.<String>create()). apply(Window.<String>into( > FixedWindows.of(Duration. standardMinutes(1))) > .triggering(AfterWatermark. pastEndOfWindow()). > withAllowedLateness(Duration. ZERO) > .discardingFiredPanes()); > > kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new > DoFn<String, String>() {.//I expect one record at a time to one object here > ------------------------------ ------------------------------ > ------------------------------ ------------------------------ > ----------------------- > > Have you confirmed that you're getting duplicate records via other library > transforms (such as applying Count.globally() to k afkarecords)? > ==>>No duplicates from Kafka. > ------------------------------ ------------------------------ > ------------------------------ ------------------------------ > ----------------------- > Additionally, I'm not sure what you mean by "executes till a record lands > on method" > ==>>Sorry for my confusing statement. Like I mentioned above, I expect > each record coming from Kafka gets assigned to one instance of the inner > class and therefore one instance of the pipeline executed it in parallel > with others executing their own unique records. > > ------------------------------ ------------------------------ > ------------------------------ ------------------------------ > ----------------------- > > Additionally additionally, is this reproducible if you execute with the > DirectRunner? > ==>>I have not tried DirectRunner. Should I? > > Thanks so much Thomas. > > > ------------------------------ > *From:* Thomas Groh <[email protected]> > *To:* [email protected] ; amir bahmanyari < > [email protected]> > *Sent:* Monday, August 8, 2016 11:43 AM > *Subject:* Re: Is Beam pipeline runtime behavior inconsistent? > > Just to make sure I understand the problem: > > You have a specific collection of records stored in Kafka. You run your > pipeline, and observe duplicate elements. Is that accurate? > > Have you confirmed that you're getting duplicate records via other library > transforms (such as applying Count.globally() to kafkarecords)? > > Additionally, I'm not sure what you mean by "executes till a record lands > on method" > > Additionally additionally, is this reproducible if you execute with the > DirectRunner? > > > On Sun, Aug 7, 2016 at 11:44 PM, amir bahmanyari <[email protected]> > wrote: > > Hi Colleagues, > I refrained from posting this email before completing thorough testing. > I think I did. > My core code works perfect & produces the expect result every single time > without wrapping it with Beam KafkaIO to receive the data. > Without KafkaIO, it receives the records from a flat data file. I repeated > it and it always produced the right result. > With including a Beam KarkaIO and embedding exact same code in a anonymous > class running Beam pipelines, I get a different result every time I rerun > it. > Below is the snippet from where KafkaIO executes till a record lands on > method. > Kafka sends precise number of records. No duplicates. all good. > While executing in Beam, when the records are finished & I expect a > correct result, it always produces something different. > Different in different runs. > I appreciate shedding light on this issue. And thanks for your valuable > time as always. > Amir- > > public static synchronized void main(String[] args) throws Exception { > > // Create Beam Options for the Flink Runner. > FlinkPipelineOptions options = PipelineOptionsFactory.as( > FlinkPipelineOptions.class); > // Set the Streaming engine as FlinkRunner > options.setRunner( FlinkPipelineRunner.class); > // This is a Streaming process (as opposed to Batch=false) > options.setStreaming(true); > //Create the DAG pipeline for parallel processing of independent LR records > Pipeline p = Pipeline.create(options); > //Kafka broker topic is identified as "lroad" > List<String> topics = Arrays.asList("lroad"); > > PCollection<String> kafkarecords = p.apply(KafkaIO.read(). > withBootstrapServers(" kafkahost:9092").withTopics( topics). > withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply( > Values.<String>create()). apply(Window.<String>into( > FixedWindows.of(Duration. standardMinutes(1))) > .triggering(AfterWatermark. pastEndOfWindow()). > withAllowedLateness(Duration. ZERO) > .accumulatingFiredPanes()); > > kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new > DoFn<String, String>() { > > public void processElement(ProcessContext ctx) > throws Exception { > > *My core logic code here.* > })); > . > . > p.run(); // Start Beam Pipeline(s) in FlinkC Cluster > } // of main > }// of class > > > > > > > > > > >
