Hey Amir, One thing that would be really useful is understanding whether this is a bug in KafkaIO or in Flink, or something else. Does the same behavior happen with the DirectRunner?
Thanks, Dan On Thu, Aug 25, 2016 at 3:39 PM, amir bahmanyari <[email protected]> wrote: > I gave up on injecting delays in Pipeline design. > I implemented the DELAYED data ingestion (as I described below in my > email) sending from my laptop (with random delays) to Kafka, and receiving > it as is in my Beam app by calling unbounded KafkaIO(). > Unlike the scenario with no delays in between sending data (burst): YES! > KafkaIO() produced the accurate results (after many months of testing it > without intermittent random delayed ingestion). > I can reproduce it with & without delays to display how KafkaIO() produces > inaccurate results in the later case (burst to Kafka-->Beam). > At the moment, I am setting executing in FlinkRunner cluster with 2-nodes. > I repeated this reproducing of the KafkaIO() behavior several times before > typing this email. > Pls let me know how we can investigate further. > > Cheers+thanks for all your help everyone. > Amir- > > ------------------------------ > *From:* Lukasz Cwik <[email protected]> > *To:* [email protected] > *Cc:* amir bahmanyari <[email protected]> > *Sent:* Wednesday, August 24, 2016 4:13 PM > *Subject:* Re: TextIO().Read pipeline implementation question > > Amir, it seems like your attempting to build a network simulation ( > https://en.wikipedia.org/wiki/Network_simulation). Are you sure Apache > Beam is the right tool for this? > > On Wed, Aug 24, 2016 at 3:54 PM, Thomas Groh <[email protected]> wrote: > > The Beam model generally is agnostic to the rate at which elements are > produced and consumed. Instead, it uses the concept of a watermark to > provide a completion metric, and element timestamps to record when an event > happened (which is independent of when the event was processed). Your > pipeline should be correct regardless of the input rate by using the > (data-based) timestamp of arriving elements instead of the time they > arrived in the Pipeline. This allows you to describe the output of your > Pipeline in terms of the input records (which have associated timestamps) > rather than the rate at which input arrived. You can assign timestamps to > an existing PCollection using the 'WithTimestamps' PTransform, or create a > new PCollection where elements have associated timestamps using the > 'Create.timestamped()' PTransform. Some sources will also output elements > with a Timestamp already associated with the element (e.g. KafkaIO or > PubSubIO). > > If the sole desire is to rate limit your input, using > CountingInput.unbounded(). withRate(Duration) will output elements at a > continuous rate to your downstream PCollection. This will output elements > over time in such a way that the desired rate is reached. > > On Wed, Aug 24, 2016 at 3:34 PM, amir bahmanyari <[email protected]> > wrote: > > Thanks for your response Ben. > The sleep+read is a part of the problem solution requirements. I know what > you mean by why not process them immediately. > The problem solution intentionally slows down processing to simulate the > traffic in expressway(s). > The assumption is that each car in emits a "record" every 30 seconds. > Making the story short, at runtime, the behavior I provided below is > expected to be implemented to accurately provide a simulated solution. > So lets say I want to inject a Sleep(random-seconds) in the pipeline > superficially before actually ParDo gets into the action. > What are the options to do that? > And using TextIO(), how can I buffer the read records by TextIO() while > Sleep() is in progress? > Thanks for your valuable time. > > > ------------------------------ > *From:* Ben Chambers <[email protected]> > *To:* [email protected] ; amir bahmanyari < > [email protected]> > *Sent:* Wednesday, August 24, 2016 3:24 PM > > *Subject:* Re: TextIO().Read pipeline implementation question > > I think the most important question is why do you want to slow down the > reads like that? If this is for testing purposes, there may be other > options, such as test specific sources. > > At a high level, the process you describes sounds somewhat like an > Unbounded Source, or perhaps an application of the not-yet-built Splittable > DoFn (https://s.apache.org/splittab le-do-fn > <https://s.apache.org/splittable-do-fn>). > > Even in those cases, "reading 100 records and then sleeping" is normally > undesirable because it limits the throughput of the pipeline. If there were > 1000 records waiting to be processed, why not process them? > > In general, a given step doesn't "submit elements to the next step". It > just outputs the elements. This is important since there may be two steps > that read from that PCollection, meaning thaht there isn't a single ParDo > to submit the elements to. > > -- Ben > > On Wed, Aug 24, 2016 at 3:12 PM amir bahmanyari <[email protected]> > wrote: > > Hi Dan, > Thanks so much for your response. > Lets focus on your "The other side" section below. > I provided the target process I am trying to implement in my first email > below. > According to your "runners do not expose hooks to control how often they > read records." looks like I am out of luck to achieve that on random > basis. > So, am trying to articulate an equivalent read/process as close as > possible to what I want. > From the "- Wake-up" step in my algorithm, I should be able to read > records but no more than 100. > Lets say I sleep for 150 milliseconds, - Wake-up, and read 100 records all > at once, and submit it to ParDo DoFn to process one by one. > How would that pipeline implementation look like? > Is there an example that shows implementation how to "sleep 150 ms" in > pipeline, then reading n number of records i.e.100 at once, and then submit > them to ParDo to process one by one pls? > I have tried so many ways to implement it but keep getting weird > compilation errors... > I appreciate your help. > Amir- > > ------------------------------ > *From:* Dan Halperin <[email protected]> > *To:* [email protected] ; amir bahmanyari < > [email protected]> > *Sent:* Wednesday, August 24, 2016 1:42 PM > > *Subject:* Re: TextIO().Read pipeline implementation question > Hi Amir, > > It is very hard to respond without sufficient details to reproduce. Can > you please send a full pipeline that we can test with test data (e.g., the > LICENSE file), including pipeline options (which runner, etc.)? > > The other side -- in general, runners do not expose hooks to control how > often they read records. If you have something like TextIO.Read | > ParDo.of(sleep for 1s) you will get 1s sleep per record, but you cannot > control how this is interleaved with reading. A runner is free to read all > the records before sleeping, read one record and sleep in a loop, and > everything in between. > > Thanks, > Dan > On Tue, Aug 23, 2016 at 5:07 PM, amir bahmanyari <[email protected]> > wrote: > > So here is what happened as a result of inserting Window of random seconds > buffering in my TextIO().Read & DoFn<>: > the number of records processed got doubled :-(( > Why is that? Could someone shed light on this pls, I appreciate it very > much. > Thanks. > Amir- > > ------------------------------ > *From:* amir bahmanyari <[email protected]> > *To:* "[email protected]. org <[email protected]>" > <[email protected]. > org <[email protected]>> > *Sent:* Tuesday, August 23, 2016 4:40 PM > *Subject:* Re: TextIO().Read pipeline implementation question > > Would this implementation work? > I am thinking to buffer records within a window of random seconds, process > DoFn > them as per each record, and repeat another random window seconds length: > > p.apply(TextIO.Read.from("/ tmp/LRData.dat")).*apply( > Window.<String>into( FixedWindows.of(Duration. > standardSeconds((int)(((15-5) * r.nextDouble()) + 5)*)))) > > .apply("PseduLRDoFn", ParDo.of(new DoFn<String, String>() { > > Thanks for your help. > Amir- > > > ------------------------------ > *From:* amir bahmanyari <[email protected]> > *To:* "[email protected]. org <[email protected]>" > <[email protected]. > org <[email protected]>> > *Sent:* Tuesday, August 23, 2016 3:51 PM > *Subject:* TextIO().Read pipeline implementation question > > Hi Colleagues, > I have no problem reading through TextIO() & processing, all by default > behavior. > > p.apply(TextIO.Read.from("/ tmp/LRData.dat")) > > .apply("PseduLRDoFn", ParDo.of(new DoFn<String, String>() { > > I want to change this logic like the following: > > - Start executing TextIo().Read but before reading anything yet > - Sleep for a random no of seconds between 5 & 15 > - Wake-up > - Read the records from the file (for the time-stamps) while TextIo().Read > was sleep > - Process records > - Back to putting TextIo() to sleep for a random no of seconds between 5 > & 15 and continue til end of the file is reached > > I appreciate your suggestions and/or if you can point me to an example. > Cheers+thanks > Amir- > > > > > > > >
