Hi Raghu,- I receive exact same number of records from Kafka each run.- I then 
invoke a method  say MyMethod thats internal to the DoFn inner class. - There 
is logic there in MyMethod to extract certain fields from the record & 
increment a counter based on satisfying a condition on those fields values.
That counter varies as per each run.Thanks Raghu.

      From: Raghu Angadi <[email protected]>
 To: [email protected]; amir bahmanyari <[email protected]> 
 Sent: Wednesday, July 13, 2016 1:52 PM
 Subject: Re: Random behavior with my Beam FlinkRunner streaming app
   
More details would be useful. You can include pseudo code for your job will be 
useful (or actual code with any specific sections removed).
How are you making sure you read exactly same same of records from Kafka in 
your reader? I don't see that you are configuring Kafka reader for that in the 
code below.
On Wed, Jul 13, 2016 at 12:30 PM, amir bahmanyari <[email protected]> wrote:

try{ PCollection<KV<String, String>> 
kafkarecords=p.apply(KafkaIO.read().withBootstrapServers("kafkaprt:9092").withTopics(topics).withValueCoder(StringUtf8Coder.of())
 .withoutMetadata()).apply(ParDo.named("startBundle").of( new DoFn<KV<byte[], 
String>, KV<String, String>>() {.....etc.




  

Reply via email to