Hi Raghu,AM cleaning up my code.Will share some of it when its clean.Thanks so 
much.

      From: Raghu Angadi <[email protected]>
 To: [email protected]; amir bahmanyari <[email protected]> 
 Sent: Wednesday, July 13, 2016 3:52 PM
 Subject: Re: Random behavior with my Beam FlinkRunner streaming app
   

On Wed, Jul 13, 2016 at 2:14 PM, amir bahmanyari <[email protected]> wrote:

Hi Raghu,- I receive exact same number of records from Kafka each run.

How do you do that? (curious + I just just want to make sure) 
- I then invoke a method  say MyMethod thats internal to the DoFn inner class. 
- There is logic there in MyMethod to extract certain fields from the record & 
increment a counter based on satisfying a condition on those fields values.

there are multiple questions I could ask here. But this is vague enough that 
there could be lots of issues. If you are just incrementing a counter and 
aggregating it correctly, there is no reason to expect different results.
Lots of people here would be interested in taking a look if you can reproduce 
it outside your set up. But most likely root cause would be either the input 
records or how you are aggregating the final count. 
That counter varies as per each run.Thanks Raghu.

      From: Raghu Angadi <[email protected]>
 To: [email protected]; amir bahmanyari <[email protected]> 
 Sent: Wednesday, July 13, 2016 1:52 PM
 Subject: Re: Random behavior with my Beam FlinkRunner streaming app
   
More details would be useful. You can include pseudo code for your job will be 
useful (or actual code with any specific sections removed).
How are you making sure you read exactly same same of records from Kafka in 
your reader? I don't see that you are configuring Kafka reader for that in the 
code below.
On Wed, Jul 13, 2016 at 12:30 PM, amir bahmanyari <[email protected]> wrote:

try{ PCollection<KV<String, String>> 
kafkarecords=p.apply(KafkaIO.read().withBootstrapServers("kafkaprt:9092").withTopics(topics).withValueCoder(StringUtf8Coder.of())
 .withoutMetadata()).apply(ParDo.named("startBundle").of( new DoFn<KV<byte[], 
String>, KV<String, String>>() {.....etc.




   



  

Reply via email to