Hey Amir,

One thing that would be really useful is understanding whether this is a
bug in KafkaIO or in Flink, or something else. Does the same behavior
happen with the DirectRunner?

Thanks,
Dan

On Thu, Aug 25, 2016 at 3:39 PM, amir bahmanyari <[email protected]>
wrote:

> I gave up on injecting delays in Pipeline design.
> I implemented the DELAYED data ingestion (as I described below in my
> email) sending from my laptop (with random delays) to Kafka, and receiving
> it as is in my Beam app by calling unbounded KafkaIO().
> Unlike the scenario with no delays in between sending data (burst): YES!
> KafkaIO() produced the accurate results (after many months of testing it
> without intermittent random delayed ingestion).
> I can reproduce it with & without delays to display how KafkaIO() produces
> inaccurate results in the later case (burst to Kafka-->Beam).
> At the moment, I am setting executing in FlinkRunner cluster with 2-nodes.
> I repeated this reproducing of the KafkaIO() behavior several times before
> typing this email.
> Pls let me know how we can investigate further.
>
> Cheers+thanks for all your help everyone.
> Amir-
>
> ------------------------------
> *From:* Lukasz Cwik <[email protected]>
> *To:* [email protected]
> *Cc:* amir bahmanyari <[email protected]>
> *Sent:* Wednesday, August 24, 2016 4:13 PM
> *Subject:* Re: TextIO().Read pipeline implementation question
>
> Amir, it seems like your attempting to build a network simulation (
> https://en.wikipedia.org/wiki/Network_simulation). Are you sure Apache
> Beam is the right tool for this?
>
> On Wed, Aug 24, 2016 at 3:54 PM, Thomas Groh <[email protected]> wrote:
>
> The Beam model generally is agnostic to the rate at which elements are
> produced and consumed. Instead, it uses the concept of a watermark to
> provide a completion metric, and element timestamps to record when an event
> happened (which is independent of when the event was processed). Your
> pipeline should be correct regardless of the input rate by using the
> (data-based) timestamp of arriving elements instead of the time they
> arrived in the Pipeline. This allows you to describe the output of your
> Pipeline in terms of the input records (which have associated timestamps)
> rather than the rate at which input arrived. You can assign timestamps to
> an existing PCollection using the 'WithTimestamps' PTransform, or create a
> new PCollection where elements have associated timestamps using the
> 'Create.timestamped()' PTransform. Some sources will also output elements
> with a Timestamp already associated with the element (e.g. KafkaIO or
> PubSubIO).
>
> If the sole desire is to rate limit your input, using
> CountingInput.unbounded(). withRate(Duration) will output elements at a
> continuous rate to your downstream PCollection. This will output elements
> over time in such a way that the desired rate is reached.
>
> On Wed, Aug 24, 2016 at 3:34 PM, amir bahmanyari <[email protected]>
> wrote:
>
> Thanks for your response Ben.
> The sleep+read is a part of the problem solution requirements. I know what
> you mean by why not process them immediately.
> The problem solution intentionally slows down processing to simulate the
> traffic in expressway(s).
> The assumption is that each car in emits a "record" every 30 seconds.
> Making the story short, at runtime, the behavior I provided below is
> expected to be implemented to accurately provide a simulated solution.
> So lets say I want to inject a Sleep(random-seconds) in the pipeline
> superficially before actually ParDo gets into the action.
> What are the options to do that?
> And using TextIO(), how can I buffer the read records by TextIO() while
> Sleep() is in progress?
> Thanks for your valuable time.
>
>
> ------------------------------
> *From:* Ben Chambers <[email protected]>
> *To:* [email protected] ; amir bahmanyari <
> [email protected]>
> *Sent:* Wednesday, August 24, 2016 3:24 PM
>
> *Subject:* Re: TextIO().Read pipeline implementation question
>
> I think the most important question is why do you want to slow down the
> reads like that? If this is for testing purposes, there may be other
> options, such as test specific sources.
>
> At a high level, the process you describes sounds somewhat like an
> Unbounded Source, or perhaps an application of the not-yet-built Splittable
> DoFn (https://s.apache.org/splittab le-do-fn
> <https://s.apache.org/splittable-do-fn>).
>
> Even in those cases, "reading 100 records and then sleeping" is normally
> undesirable because it limits the throughput of the pipeline. If there were
> 1000 records waiting to be processed, why not process them?
>
> In general, a given step doesn't "submit elements to the next step". It
> just outputs the elements. This is important since there may be two steps
> that read from that PCollection, meaning thaht there isn't a single ParDo
> to submit the elements to.
>
> -- Ben
>
> On Wed, Aug 24, 2016 at 3:12 PM amir bahmanyari <[email protected]>
> wrote:
>
> Hi Dan,
> Thanks so much for your response.
> Lets focus on your "The other side" section below.
> I provided the target process I am trying to implement in my first email
> below.
> According to your "runners do not expose hooks to control how often they
> read records." looks like I am out  of luck to achieve that on random
> basis.
> So, am trying to articulate an equivalent read/process as close as
> possible to what I want.
> From the "- Wake-up" step in my algorithm, I should be able to read
> records but no more than 100.
> Lets say I sleep for 150 milliseconds, - Wake-up, and read 100 records all
> at once, and submit it to ParDo DoFn to process one by one.
> How would that pipeline implementation look like?
> Is there an example that shows implementation how to "sleep 150 ms" in
> pipeline, then reading n number of records i.e.100 at once, and then submit
> them to ParDo to process one by one pls?
> I have tried so many ways to implement it but keep getting weird
> compilation errors...
> I appreciate your help.
> Amir-
>
> ------------------------------
> *From:* Dan Halperin <[email protected]>
> *To:* [email protected] ; amir bahmanyari <
> [email protected]>
> *Sent:* Wednesday, August 24, 2016 1:42 PM
>
> *Subject:* Re: TextIO().Read pipeline implementation question
> Hi Amir,
>
> It is very hard to respond without sufficient details to reproduce. Can
> you please send a full pipeline that we can test with test data (e.g., the
> LICENSE file), including pipeline options (which runner, etc.)?
>
> The other side -- in general, runners do not expose hooks to control how
> often they read records. If you have something like TextIO.Read |
> ParDo.of(sleep for 1s) you will get 1s sleep per record, but you cannot
> control how this is interleaved with reading. A runner is free to read all
> the records before sleeping, read one record and sleep in a loop, and
> everything in between.
>
> Thanks,
> Dan
> On Tue, Aug 23, 2016 at 5:07 PM, amir bahmanyari <[email protected]>
> wrote:
>
> So here is what happened as a result of inserting Window of random seconds
> buffering in my TextIO().Read & DoFn<>:
> the number of records processed got doubled :-((
> Why is that? Could someone shed light on this pls, I appreciate it very
> much.
> Thanks.
> Amir-
>
> ------------------------------
> *From:* amir bahmanyari <[email protected]>
> *To:* "[email protected]. org <[email protected]>" 
> <[email protected].
> org <[email protected]>>
> *Sent:* Tuesday, August 23, 2016 4:40 PM
> *Subject:* Re: TextIO().Read pipeline implementation question
>
> Would this implementation work?
> I am thinking to buffer records within a window of random seconds, process 
> DoFn
> them as per each record, and repeat another random window seconds length:
>
> p.apply(TextIO.Read.from("/ tmp/LRData.dat")).*apply(
> Window.<String>into( FixedWindows.of(Duration.
> standardSeconds((int)(((15-5) * r.nextDouble()) + 5)*))))
>
> .apply("PseduLRDoFn", ParDo.of(new DoFn<String, String>() {
>
> Thanks for your help.
> Amir-
>
>
> ------------------------------
> *From:* amir bahmanyari <[email protected]>
> *To:* "[email protected]. org <[email protected]>" 
> <[email protected].
> org <[email protected]>>
> *Sent:* Tuesday, August 23, 2016 3:51 PM
> *Subject:* TextIO().Read pipeline implementation question
>
> Hi Colleagues,
> I have no problem reading through TextIO() & processing, all by default
> behavior.
>
> p.apply(TextIO.Read.from("/ tmp/LRData.dat"))
>
> .apply("PseduLRDoFn", ParDo.of(new DoFn<String, String>() {
>
> I want to change this logic like the following:
>
> - Start executing TextIo().Read but before reading anything yet
> - Sleep for a random no of seconds between 5 & 15
> - Wake-up
> - Read the records from the file (for the time-stamps) while TextIo().Read
> was sleep
> - Process records
> - Back to putting TextIo() to sleep for  a random no of seconds between 5
> & 15 and continue til end of the file is reached
>
> I appreciate your suggestions and/or if you can point me to an example.
> Cheers+thanks
> Amir-
>
>
>
>
>
>
>
>

Reply via email to