Hi Thomas,Thanks so much for your response. Here are answers to your
questions.You have a specific collection of records stored in Kafka. You run
your pipeline, and observe duplicate elements. Is that accurate?
==>> I send records to Kafka from my laptop. I use KafkaIO() to receive the
records. I have confirmed that I dont get duplicates from Kafka. However,for
some reason, certain parts of my code execute beyond the actual number of
expected number of records, and subsequently produce extra resulting data. I
tried playing with the Triggering. Stretching the window interval,
DiscardingFiredPanes etc. all kinds of modes.Same. How can I guarantee that
one record at a time executes in one unique instance of the inner class
object?I have all the shared objects synchronized and am using Java concurrent
hashmaps. How can I guarantee synchronized operations amongst "parallel
pipelines"? Analogous to multiple threads accessing a shared object and trying
to modify it...
Here is my current KafkaIO() call: PCollection<String> kafkarecords =
p.apply(KafkaIO.read().withBootstrapServers("kafkahost:9092").withTopics(topics).
withValueCoder(StringUtf8Coder.of()).withoutMetadata()).apply(Values.<String>create()).apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
.triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());
kafkarecords.apply(ParDo.named("ProcessLRKafkaData").of(new DoFn<String,
String>() {.//I expect one record at a time to one object
here-----------------------------------------------------------------------------------------------------------------------------------------------
Have you confirmed that you're getting duplicate records via other library
transforms (such as applying Count.globally() to kafkarecords)?==>>No
duplicates from
Kafka.-----------------------------------------------------------------------------------------------------------------------------------------------Additionally,
I'm not sure what you mean by "executes till a record lands on
method"==>>Sorry for my confusing statement. Like I mentioned above, I expect
each record coming from Kafka gets assigned to one instance of the inner class
and therefore one instance of the pipeline executed it in parallel with others
executing their own unique records.
-----------------------------------------------------------------------------------------------------------------------------------------------
Additionally additionally, is this reproducible if you execute with the
DirectRunner? ==>>I have not tried DirectRunner. Should I?
Thanks so much Thomas.
From: Thomas Groh <[email protected]>
To: [email protected]; amir bahmanyari <[email protected]>
Sent: Monday, August 8, 2016 11:43 AM
Subject: Re: Is Beam pipeline runtime behavior inconsistent?
Just to make sure I understand the problem:
You have a specific collection of records stored in Kafka. You run your
pipeline, and observe duplicate elements. Is that accurate?
Have you confirmed that you're getting duplicate records via other library
transforms (such as applying Count.globally() to kafkarecords)?
Additionally, I'm not sure what you mean by "executes till a record lands on
method"
Additionally additionally, is this reproducible if you execute with the
DirectRunner?
On Sun, Aug 7, 2016 at 11:44 PM, amir bahmanyari <[email protected]> wrote:
Hi Colleagues,I refrained from posting this email before completing thorough
testing.I think I did.My core code works perfect & produces the expect result
every single time without wrapping it with Beam KafkaIO to receive the
data.Without KafkaIO, it receives the records from a flat data file. I repeated
it and it always produced the right result.With including a Beam KarkaIO and
embedding exact same code in a anonymous class running Beam pipelines, I get a
different result every time I rerun it.Below is the snippet from where KafkaIO
executes till a record lands on method.Kafka sends precise number of records.
No duplicates. all good.While executing in Beam, when the records are finished
& I expect a correct result, it always produces something different. Different
in different runs.I appreciate shedding light on this issue. And thanks for
your valuable time as always.Amir-
public static synchronized void main(String[] args) throws Exception {
// Create Beam Options for the Flink Runner. FlinkPipelineOptions options =
PipelineOptionsFactory.as( FlinkPipelineOptions.class); // Set the Streaming
engine as FlinkRunner options.setRunner( FlinkPipelineRunner.class); // This is
a Streaming process (as opposed to Batch=false) options.setStreaming(true);
//Create the DAG pipeline for parallel processing of independent LR records
Pipeline p = Pipeline.create(options); //Kafka broker topic is identified as
"lroad" List<String> topics = Arrays.asList("lroad");
PCollection<String> kafkarecords = p.apply(KafkaIO.read().
withBootstrapServers(" kafkahost:9092").withTopics( topics). withValueCoder(
StringUtf8Coder.of()). withoutMetadata()).apply( Values.<String>create()).
apply(Window.<String>into( FixedWindows.of(Duration. standardMinutes(1)))
.triggering(AfterWatermark. pastEndOfWindow()).
withAllowedLateness(Duration. ZERO) .accumulatingFiredPanes());
kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new DoFn<String,
String>() {
public void processElement(ProcessContext ctx) throws
Exception {
My core logic code here.
}));..p.run(); // Start Beam Pipeline(s) in FlinkC Cluster
} // of main}// of class