On Wed, Jul 13, 2016 at 2:14 PM, amir bahmanyari <[email protected]> wrote:
> Hi Raghu, > - I receive exact same number of records from Kafka each run. > How do you do that? (curious + I just just want to make sure) > - I then invoke a method say MyMethod thats internal to the DoFn inner > class. > - There is logic there in MyMethod to extract certain fields from the > record & increment a counter based on satisfying a condition on those > fields values. > there are multiple questions I could ask here. But this is vague enough that there could be lots of issues. If you are just incrementing a counter and aggregating it correctly, there is no reason to expect different results. Lots of people here would be interested in taking a look if you can reproduce it outside your set up. But most likely root cause would be either the input records or how you are aggregating the final count. > That counter varies as per each run. > Thanks Raghu. > > ------------------------------ > *From:* Raghu Angadi <[email protected]> > *To:* [email protected]; amir bahmanyari <[email protected]> > > *Sent:* Wednesday, July 13, 2016 1:52 PM > *Subject:* Re: Random behavior with my Beam FlinkRunner streaming app > > More details would be useful. You can include pseudo code for your job > will be useful (or actual code with any specific sections removed). > > How are you making sure you read exactly same same of records from Kafka > in your reader? I don't see that you are configuring Kafka reader for that > in the code below. > > On Wed, Jul 13, 2016 at 12:30 PM, amir bahmanyari <[email protected]> > wrote: > > try{ > PCollection<KV<String, String>> > kafkarecords=p.apply(KafkaIO.read().withBootstrapServers("kafkaprt:9092").withTopics(topics).withValueCoder(StringUtf8Coder.of()) > .withoutMetadata()).apply(ParDo.named("startBundle").of( > new DoFn<KV<byte[], String>, KV<String, String>>() { > .....etc. > > > > > >
