Hi Amir,

It is very hard to respond without sufficient details to reproduce. Can you
please send a full pipeline that we can test with test data (e.g., the
LICENSE file), including pipeline options (which runner, etc.)?

The other side -- in general, runners do not expose hooks to control how
often they read records. If you have something like TextIO.Read |
ParDo.of(sleep for 1s) you will get 1s sleep per record, but you cannot
control how this is interleaved with reading. A runner is free to read all
the records before sleeping, read one record and sleep in a loop, and
everything in between.

Thanks,
Dan

On Tue, Aug 23, 2016 at 5:07 PM, amir bahmanyari <[email protected]>
wrote:

> So here is what happened as a result of inserting Window of random seconds
> buffering in my TextIO().Read & DoFn<>:
> the number of records processed got doubled :-((
> Why is that? Could someone shed light on this pls, I appreciate it very
> much.
> Thanks.
> Amir-
>
> ------------------------------
> *From:* amir bahmanyari <[email protected]>
> *To:* "[email protected]" <[email protected]>
> *Sent:* Tuesday, August 23, 2016 4:40 PM
> *Subject:* Re: TextIO().Read pipeline implementation question
>
> Would this implementation work?
> I am thinking to buffer records within a window of random seconds, process 
> DoFn
> them as per each record, and repeat another random window seconds length:
>
> p.apply(TextIO.Read.from("/tmp/LRData.dat")).*apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds((int)(((15-5)
> * r.nextDouble()) + 5)*))))
> .apply("PseduLRDoFn", ParDo.of(new DoFn<String, String>() {
>
> Thanks for your help.
> Amir-
>
>
> ------------------------------
> *From:* amir bahmanyari <[email protected]>
> *To:* "[email protected]" <[email protected]>
> *Sent:* Tuesday, August 23, 2016 3:51 PM
> *Subject:* TextIO().Read pipeline implementation question
>
> Hi Colleagues,
> I have no problem reading through TextIO() & processing, all by default
> behavior.
> p.apply(TextIO.Read.from("/tmp/LRData.dat"))
> .apply("PseduLRDoFn", ParDo.of(new DoFn<String, String>() {
>
> I want to change this logic like the following:
>
> - Start executing TextIo().Read but before reading anything yet
> - Sleep for a random no of seconds between 5 & 15
> - Wake-up
> - Read the records from the file (for the time-stamps) while TextIo().Read
> was sleep
> - Process records
> - Back to putting TextIo() to sleep for  a random no of seconds between 5
> & 15 and continue til end of the file is reached
>
> I appreciate your suggestions and/or if you can point me to an example.
> Cheers+thanks
> Amir-
>
>
>
>
>

Reply via email to