Hi Raghu,AM cleaning up my code.Will share some of it when its clean.Thanks so
much.
From: Raghu Angadi <[email protected]>
To: [email protected]; amir bahmanyari <[email protected]>
Sent: Wednesday, July 13, 2016 3:52 PM
Subject: Re: Random behavior with my Beam FlinkRunner streaming app
On Wed, Jul 13, 2016 at 2:14 PM, amir bahmanyari <[email protected]> wrote:
Hi Raghu,- I receive exact same number of records from Kafka each run.
How do you do that? (curious + I just just want to make sure)
- I then invoke a method say MyMethod thats internal to the DoFn inner class.
- There is logic there in MyMethod to extract certain fields from the record &
increment a counter based on satisfying a condition on those fields values.
there are multiple questions I could ask here. But this is vague enough that
there could be lots of issues. If you are just incrementing a counter and
aggregating it correctly, there is no reason to expect different results.
Lots of people here would be interested in taking a look if you can reproduce
it outside your set up. But most likely root cause would be either the input
records or how you are aggregating the final count.
That counter varies as per each run.Thanks Raghu.
From: Raghu Angadi <[email protected]>
To: [email protected]; amir bahmanyari <[email protected]>
Sent: Wednesday, July 13, 2016 1:52 PM
Subject: Re: Random behavior with my Beam FlinkRunner streaming app
More details would be useful. You can include pseudo code for your job will be
useful (or actual code with any specific sections removed).
How are you making sure you read exactly same same of records from Kafka in
your reader? I don't see that you are configuring Kafka reader for that in the
code below.
On Wed, Jul 13, 2016 at 12:30 PM, amir bahmanyari <[email protected]> wrote:
try{ PCollection<KV<String, String>>
kafkarecords=p.apply(KafkaIO.read().withBootstrapServers("kafkaprt:9092").withTopics(topics).withValueCoder(StringUtf8Coder.of())
.withoutMetadata()).apply(ParDo.named("startBundle").of( new DoFn<KV<byte[],
String>, KV<String, String>>() {.....etc.