You have a great fundamental point Lukasz :))But I dont have any other choice 
but running it in Beam Flink & Spark for bench-marking purposes.Similar 
research has been done on Storm & Streams & we want to compare Flink & Spark 
engines under Beam.THIS PROBLEM has been used in the previous work. So 
ultimately we need to compare apples & apples.That's why :)I wrote a little 
DataProvider sending the records according to the delayed logic I have below.I 
send them to KafkaIO(). Have not tested it yet. Kinda late wanna go home. Will 
start it from home this eve.Maybe the "delay" will make a magic 
difference...will see...
But, without these delays, in the past I always got redundant processing under 
KafkaIO() as I have posted my questions on that issue in this forum 
before.Thats why I turned to TextIO() which works fine. However according to 
everyone suggestions here, I cannt build the Beam Pipeline & include this 
delay+read step.So am back to KafkaIO()...perhaps with more questions in the 
near future...Thanks for your help.CheersAmir-

      From: Lukasz Cwik <[email protected]>
 To: [email protected] 
Cc: amir bahmanyari <[email protected]>
 Sent: Wednesday, August 24, 2016 4:13 PM
 Subject: Re: TextIO().Read pipeline implementation question
   
Amir, it seems like your attempting to build a network simulation 
(https://en.wikipedia.org/wiki/Network_simulation). Are you sure Apache Beam is 
the right tool for this?
On Wed, Aug 24, 2016 at 3:54 PM, Thomas Groh <[email protected]> wrote:

The Beam model generally is agnostic to the rate at which elements are produced 
and consumed. Instead, it uses the concept of a watermark to provide a 
completion metric, and element timestamps to record when an event happened 
(which is independent of when the event was processed). Your pipeline should be 
correct regardless of the input rate by using the (data-based) timestamp of 
arriving elements instead of the time they arrived in the Pipeline. This allows 
you to describe the output of your Pipeline in terms of the input records 
(which have associated timestamps) rather than the rate at which input arrived. 
You can assign timestamps to an existing PCollection using the 'WithTimestamps' 
PTransform, or create a new PCollection where elements have associated 
timestamps using the 'Create.timestamped()' PTransform. Some sources will also 
output elements with a Timestamp already associated with the element (e.g. 
KafkaIO or PubSubIO).

If the sole desire is to rate limit your input, using 
CountingInput.unbounded(). withRate(Duration) will output elements at a 
continuous rate to your downstream PCollection. This will output elements over 
time in such a way that the desired rate is reached.
On Wed, Aug 24, 2016 at 3:34 PM, amir bahmanyari <[email protected]> wrote:

Thanks for your response Ben.The sleep+read is a part of the problem solution 
requirements. I know what you mean by why not process them immediately. The 
problem solution intentionally slows down processing to simulate the traffic in 
expressway(s).The assumption is that each car in emits a "record" every 30 
seconds. Making the story short, at runtime, the behavior I provided below is 
expected to be implemented to accurately provide a simulated solution.So lets 
say I want to inject a Sleep(random-seconds) in the pipeline superficially 
before actually ParDo gets into the action. What are the options to do that?And 
using TextIO(), how can I buffer the read records by TextIO() while Sleep() is 
in progress?Thanks for your valuable time.

      From: Ben Chambers <[email protected]>
 To: [email protected] ; amir bahmanyari <[email protected]> 
 Sent: Wednesday, August 24, 2016 3:24 PM
 Subject: Re: TextIO().Read pipeline implementation question
   
I think the most important question is why do you want to slow down the reads 
like that? If this is for testing purposes, there may be other options, such as 
test specific sources.
At a high level, the process you describes sounds somewhat like an Unbounded 
Source, or perhaps an application of the not-yet-built Splittable DoFn 
(https://s.apache.org/splittab le-do-fn).
Even in those cases, "reading 100 records and then sleeping" is normally 
undesirable because it limits the throughput of the pipeline. If there were 
1000 records waiting to be processed, why not process them?
In general, a given step doesn't "submit elements to the next step". It just 
outputs the elements. This is important since there may be two steps that read 
from that PCollection, meaning thaht there isn't a single ParDo to submit the 
elements to.
-- Ben
On Wed, Aug 24, 2016 at 3:12 PM amir bahmanyari <[email protected]> wrote:

Hi Dan,Thanks so much for your response.Lets focus on your "The other side" 
section below.I provided the target process I am trying to implement in my 
first email below.According to your "runners do not expose hooks to control how 
often they read records." looks like I am out  of luck to achieve that on 
random basis.So, am trying to articulate an equivalent read/process as close as 
possible to what I want.From the "- Wake-up" step in my algorithm, I should be 
able to read records but no more than 100.Lets say I sleep for 150 
milliseconds, - Wake-up, and read 100 records all at once, and submit it to 
ParDo DoFn to process one by one.How would that pipeline implementation look 
like? Is there an example that shows implementation how to "sleep 150 ms" in 
pipeline, then reading n number of records i.e.100 at once, and then submit 
them to ParDo to process one by one pls?I have tried so many ways to implement 
it but keep getting weird compilation errors...I appreciate your help.Amir-

      From: Dan Halperin <[email protected]>
 To: [email protected] ; amir bahmanyari <[email protected]> 
 Sent: Wednesday, August 24, 2016 1:42 PM
 Subject: Re: TextIO().Read pipeline implementation question
 Hi Amir,
It is very hard to respond without sufficient details to reproduce. Can you 
please send a full pipeline that we can test with test data (e.g., the LICENSE 
file), including pipeline options (which runner, etc.)?
The other side -- in general, runners do not expose hooks to control how often 
they read records. If you have something like TextIO.Read | ParDo.of(sleep for 
1s) you will get 1s sleep per record, but you cannot control how this is 
interleaved with reading. A runner is free to read all the records before 
sleeping, read one record and sleep in a loop, and everything in between.
Thanks,DanOn Tue, Aug 23, 2016 at 5:07 PM, amir bahmanyari 
<[email protected]> wrote:

So here is what happened as a result of inserting Window of random seconds 
buffering in my TextIO().Read & DoFn<>:the number of records processed got 
doubled :-((Why is that? Could someone shed light on this pls, I appreciate it 
very much.Thanks.Amir-



    From: amir bahmanyari <[email protected]>
 To: "[email protected]. org" <[email protected]. org> 
 Sent: Tuesday, August 23, 2016 4:40 PM
 Subject: Re: TextIO().Read pipeline implementation question
 

Would this implementation work?I am thinking to buffer records within a window 
of random seconds, process DoFn them as per each record, and repeat another 
random window seconds length:


p.apply(TextIO.Read.from("/ tmp/LRData.dat")).apply( Window.<String>into( 
FixedWindows.of(Duration. standardSeconds((int)(((15-5) * r.nextDouble()) + 
5))))) 

 .apply("PseduLRDoFn", ParDo.of(new DoFn<String, String>() {

Thanks for your help.Amir-



    From: amir bahmanyari <[email protected]>
 To: "[email protected]. org" <[email protected]. org> 
 Sent: Tuesday, August 23, 2016 3:51 PM
 Subject: TextIO().Read pipeline implementation question
  

Hi Colleagues,I have no problem reading through TextIO() & processing, all by 
default behavior.

p.apply(TextIO.Read.from("/ tmp/LRData.dat")) 

 .apply("PseduLRDoFn", ParDo.of(new DoFn<String, String>() {
I want to change this logic like the following:
- Start executing TextIo().Read but before reading anything yet- Sleep for a 
random no of seconds between 5 & 15- Wake-up- Read the records from the file 
(for the time-stamps) while TextIo().Read was sleep- Process records- Back to 
putting TextIo() to sleep for  a random no of seconds between 5 & 15 and 
continue til end of the file is reached
I appreciate your suggestions and/or if you can point me to an 
example.Cheers+thanksAmir-



   





   

Reply via email to