On Wed, Jul 13, 2016 at 2:14 PM, amir bahmanyari <[email protected]>
wrote:

> Hi Raghu,
> - I receive exact same number of records from Kafka each run.
>

How do you do that? (curious + I just just want to make sure)


> - I then invoke a method  say MyMethod thats internal to the DoFn inner
> class.
> - There is logic there in MyMethod to extract certain fields from the
> record & increment a counter based on satisfying a condition on those
> fields values.
>

there are multiple questions I could ask here. But this is vague enough
that there could be lots of issues. If you are just incrementing a counter
and aggregating it correctly, there is no reason to expect different
results.

Lots of people here would be interested in taking a look if you can
reproduce it outside your set up. But most likely root cause would be
either the input records or how you are aggregating the final count.


> That counter varies as per each run.
> Thanks Raghu.
>
> ------------------------------
> *From:* Raghu Angadi <[email protected]>
> *To:* [email protected]; amir bahmanyari <[email protected]>
>
> *Sent:* Wednesday, July 13, 2016 1:52 PM
> *Subject:* Re: Random behavior with my Beam FlinkRunner streaming app
>
> More details would be useful. You can include pseudo code for your job
> will be useful (or actual code with any specific sections removed).
>
> How are you making sure you read exactly same same of records from Kafka
> in your reader? I don't see that you are configuring Kafka reader for that
> in the code below.
>
> On Wed, Jul 13, 2016 at 12:30 PM, amir bahmanyari <[email protected]>
> wrote:
>
> try{
> PCollection<KV<String, String>>
> kafkarecords=p.apply(KafkaIO.read().withBootstrapServers("kafkaprt:9092").withTopics(topics).withValueCoder(StringUtf8Coder.of())
> .withoutMetadata()).apply(ParDo.named("startBundle").of(
> new DoFn<KV<byte[], String>, KV<String, String>>() {
> .....etc.
>
>
>
>
>
>

Reply via email to