There's no way to guarantee that exactly one record is processed at a time.
This is part of the design of ParDo to work efficiently across multiple
processes and machines[1], where multiple instances of a DoFn must exist in
order for progress to be made in a timely fashion. This includes processing
the same element across multiple machines at the same time, with only one
of the results being available in the output PCollection, as well as
retries of failed elements.

A runner is required to interact with a DoFn instance in a single-threaded
manner - however, it is permitted to have multiple different DoFn instances
active within a single process and across processes at any given time (for
the same reasons as above). There's no support in the Beam model to
restrict this type of execution. We do not encourage sharing objects
between DoFn instances, and any shared state must be accessed in a
thread-safe manner, and modifications to shared state should be idempotent,
as otherwise retries and speculative execution may cause that state to be
inconsistent. A DoFn will be reused for multiple elements across a single
bundle, and may be reused across multiple bundles - if you require the DoFn
to be "fresh" per element, it should perform any required setup at the
start of the ProcessElement method.

The best that can be done if it is absolutely required to restrict
processing to a single element at a time would be to group all of the
elements to a single key. Note that this will not solve the problem in all
cases, as a runner is permitted to execute the group of elements multiple
times so long as it only takes one completed bundle as the result, and
additionally this removes the ability of the runner to balance work and
introduces a performance bottleneck. To do so, you would key the inputs to
a single static key and apply a GroupByKey, running the processing method
on the output Iterable produced by the GroupByKey (directly; expanding the
input iterable in a separate PCollection allows a runner to rebalance the
elements, which will reintroduce parallelism)`.

[1] https://github.com/apache/incubator-beam/blob/master/
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java#L360

On Mon, Aug 8, 2016 at 12:46 PM, amir bahmanyari <[email protected]>
wrote:

> Hi Thomas,
> Thanks so much for your response. Here are answers to your questions.
> You have a specific collection of records stored in Kafka. You run your
> pipeline, and observe duplicate elements. Is that accurate?
>
> ==>> I send records to Kafka from my laptop. I use KafkaIO() to receive
> the records. I have confirmed that I dont get duplicates from Kafka.
> However,
> for some reason, certain parts of my code execute beyond the actual number
> of expected number of records, and subsequently produce extra resulting
> data.
> I tried playing with the Triggering. Stretching the window interval,
> DiscardingFiredPanes etc. all kinds of modes.
> Same.  How can I guarantee that one record at a time executes in one
> unique instance of the inner class object?
> I have all the shared objects synchronized and am using Java concurrent
> hashmaps. How can I guarantee synchronized operations amongst "parallel
> pipelines"? Analogous to multiple threads accessing a shared object and
> trying to modify it...
>
> Here is my current KafkaIO() call:
> PCollection<String> kafkarecords = p.apply(KafkaIO.read().
> withBootstrapServers("kafkahost:9092").withTopics(topics).
> withValueCoder(StringUtf8Coder.of()).withoutMetadata()).apply(
> Values.<String>create()).apply(Window.<String>into(
> FixedWindows.of(Duration.standardMinutes(1)))
>          .triggering(AfterWatermark.pastEndOfWindow()).
> withAllowedLateness(Duration.ZERO)
>    .discardingFiredPanes());
>
>     kafkarecords.apply(ParDo.named("ProcessLRKafkaData").of(new
> DoFn<String, String>() {.//I expect one record at a time to one object here
> ------------------------------------------------------------
> ------------------------------------------------------------
> -----------------------
>
> Have you confirmed that you're getting duplicate records via other library
> transforms (such as applying Count.globally() to kafkarecords)?
> ==>>No duplicates from Kafka.
> ------------------------------------------------------------
> ------------------------------------------------------------
> -----------------------
> Additionally, I'm not sure what you mean by "executes till a record lands
> on method"
> ==>>Sorry for my confusing statement. Like I mentioned above, I expect
> each record coming from Kafka gets assigned to one instance of the inner
> class and therefore one instance of the pipeline executed it in parallel
> with others executing their own unique records.
>
> ------------------------------------------------------------
> ------------------------------------------------------------
> -----------------------
>
> Additionally additionally, is this reproducible if you execute with the
> DirectRunner?
> ==>>I have not tried DirectRunner. Should I?
>
> Thanks so much Thomas.
>
>
> ------------------------------
> *From:* Thomas Groh <[email protected]>
> *To:* [email protected]; amir bahmanyari <[email protected]>
>
> *Sent:* Monday, August 8, 2016 11:43 AM
> *Subject:* Re: Is Beam pipeline runtime behavior inconsistent?
>
> Just to make sure I understand the problem:
>
> You have a specific collection of records stored in Kafka. You run your
> pipeline, and observe duplicate elements. Is that accurate?
>
> Have you confirmed that you're getting duplicate records via other library
> transforms (such as applying Count.globally() to kafkarecords)?
>
> Additionally, I'm not sure what you mean by "executes till a record lands
> on method"
>
> Additionally additionally, is this reproducible if you execute with the
> DirectRunner?
>
>
> On Sun, Aug 7, 2016 at 11:44 PM, amir bahmanyari <[email protected]>
> wrote:
>
> Hi Colleagues,
> I refrained from posting this email before completing thorough testing.
> I think I did.
> My core code works perfect & produces the expect result every single time
> without wrapping it with Beam KafkaIO to receive the data.
> Without KafkaIO, it receives the records from a flat data file. I repeated
> it and it always produced the right result.
> With including a Beam KarkaIO and embedding exact same code in a anonymous
> class running Beam pipelines, I get a different result every time I rerun
> it.
> Below is the snippet from where KafkaIO executes till a record lands on
> method.
> Kafka sends precise number of records. No duplicates. all good.
> While executing in Beam, when the records are finished & I expect a
> correct result, it always produces something different.
> Different in different runs.
> I appreciate shedding light on this issue.  And thanks for your valuable
> time as always.
> Amir-
>
> public static synchronized void main(String[] args) throws Exception {
>
> // Create Beam Options for the Flink Runner.
> FlinkPipelineOptions options = PipelineOptionsFactory.as(
> FlinkPipelineOptions.class);
> // Set the Streaming engine as FlinkRunner
> options.setRunner( FlinkPipelineRunner.class);
> // This is a Streaming process (as opposed to Batch=false)
> options.setStreaming(true);
> //Create the DAG pipeline for parallel processing of independent LR records
> Pipeline p = Pipeline.create(options);
> //Kafka broker topic is identified as "lroad"
> List<String> topics = Arrays.asList("lroad");
>
> PCollection<String> kafkarecords = p.apply(KafkaIO.read().
> withBootstrapServers(" kafkahost:9092").withTopics( topics).
> withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply(
> Values.<String>create()). apply(Window.<String>into(
> FixedWindows.of(Duration. standardMinutes(1)))
>          .triggering(AfterWatermark. pastEndOfWindow()).
> withAllowedLateness(Duration. ZERO)
>    .accumulatingFiredPanes());
>
>     kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new
> DoFn<String, String>() {
>
>                         public void processElement(ProcessContext ctx)
> throws Exception {
>
>                                         *My core logic code here.*
> }));
> .
> .
> p.run(); // Start Beam Pipeline(s) in FlinkC Cluster
> } // of main
> }// of class
>
>
>
>
>

Reply via email to