A bundle is a logical grouping of records which is controlled by a runner to break down really large datasets into smaller manageable pieces.
Collect all the records that you see during processElement into a list inside your DoFn and then output a single log line in finishBundle with all the elements that you saw. Clear the list after outputting the log line. Then you'll see the bundles of work that your DoFns are processing and can then logically see whats occurring. On Wed, Jul 13, 2016 at 3:47 PM, amir bahmanyari <[email protected]> wrote: > Thanks Lukasz. > "logging the records you see for each bundle " makes me wonder why you > are referring to "bundle"? > Sorry my confusion. > My assumption is that I receive "one record" at a time from Kafka, and I > am executing "one record" at a time in DoFn class object. > Is there something in the way I am invoking KafkaIO that translates to a > "bundle" rather than a "single record" at a time? > try{ > PCollection<KV<String, String>> > kafkarecords=p.apply(KafkaIO.read().withBootstrapServers("kafkaprt:9092").withTopics(topics).withValueCoder(StringUtf8Coder.of()) > .withoutMetadata()).apply(ParDo.named("startBundle").of( > new DoFn<KV<byte[], String>, KV<String, String>>() { > > Perhaps I am overlapping / repeating...somehow... > Thanks again Lukasz. > > > ------------------------------ > *From:* Lukasz Cwik <[email protected]> > *To:* [email protected]; amir bahmanyari <[email protected]> > > *Sent:* Wednesday, July 13, 2016 12:38 PM > > *Subject:* Re: Random behavior with my Beam FlinkRunner streaming app > > Try using smaller datasets and logging the records you see for each bundle > in your DoFns. This will help you see how your data is transitioning > through the pipeline. > > On Wed, Jul 13, 2016 at 3:30 PM, amir bahmanyari <[email protected]> > wrote: > > Thanks Lukasz, > I receive records via Kafka to my Beam app KafkaIO.read(): > And this is how: > try{ > PCollection<KV<String, String>> > kafkarecords=p.apply(KafkaIO.read().withBootstrapServers("kafkaprt:9092").withTopics(topics).withValueCoder(StringUtf8Coder.of()) > .withoutMetadata()).apply(ParDo.named("startBundle").of( > new DoFn<KV<byte[], String>, KV<String, String>>() { > .....etc. > > Pls let me know if I should provide further deeper details....I appreciate > your attention. > Am sure there are lessons for me to learn from "this" :-) > Cheers+thanks so much. > > > > ------------------------------ > *From:* Lukasz Cwik <[email protected]> > *To:* [email protected]; amir bahmanyari <[email protected]> > > *Sent:* Wednesday, July 13, 2016 12:22 PM > *Subject:* Re: Random behavior with my Beam FlinkRunner streaming app > > Are your DoFn's idempotent and don't rely on ordering of elements? > Do you use any triggers? > > Lots of things that can non-determinism to your output, need more details > about what your pipeline does. > Using smaller input datasets can help you track down the source of > non-determinism. > > > On Wed, Jul 13, 2016 at 3:09 PM, amir bahmanyari <[email protected]> > wrote: > > Hi Colleagues, > I am getting random results for: > - exact same data input > - exact same app binary > - exact same Flink cluster instances > Everything fixed, just repeat of running the something. > Every-time, I get a different result while data doesn't change, code > doesn't change, logic to calculate results is exact same... > > Is Beam "parallelism" playing a role due to something "un-usual" in my > code? > What could the "un-usual" be in the app that may make the Beam pipleline > produces different results for exact same "everything"? > Than+regards, > Amir- > > > > > > > >
