Hi Raghu,- I receive exact same number of records from Kafka each run.- I then
invoke a method say MyMethod thats internal to the DoFn inner class. - There
is logic there in MyMethod to extract certain fields from the record &
increment a counter based on satisfying a condition on those fields values.
That counter varies as per each run.Thanks Raghu.
From: Raghu Angadi <[email protected]>
To: [email protected]; amir bahmanyari <[email protected]>
Sent: Wednesday, July 13, 2016 1:52 PM
Subject: Re: Random behavior with my Beam FlinkRunner streaming app
More details would be useful. You can include pseudo code for your job will be
useful (or actual code with any specific sections removed).
How are you making sure you read exactly same same of records from Kafka in
your reader? I don't see that you are configuring Kafka reader for that in the
code below.
On Wed, Jul 13, 2016 at 12:30 PM, amir bahmanyari <[email protected]> wrote:
try{ PCollection<KV<String, String>>
kafkarecords=p.apply(KafkaIO.read().withBootstrapServers("kafkaprt:9092").withTopics(topics).withValueCoder(StringUtf8Coder.of())
.withoutMetadata()).apply(ParDo.named("startBundle").of( new DoFn<KV<byte[],
String>, KV<String, String>>() {.....etc.