The behavior of a runner with regards to source invocation when the source emits the maximum watermark (nit: BoundedWindow.TIMESTAMP_MAX_VALUE is the maximum timestamp; this is Long.MAX_VALUE in microseconds since the epoch, not millis as might be assumed) is currently runner-defined. This will cause watermark-based timers for the Global Window to fire, and all input elements should be considered droppably late. The DirectRunner will shut down by default if it reaches this state, but runners are not required to shut down if all watermarks reach this value.
On Fri, Jun 24, 2016 at 9:46 AM, Raghu Angadi <[email protected]> wrote: > Note that KafkaIO lets you set your own watermark for each record. > > On Fri, Jun 24, 2016 at 9:45 AM, Raghu Angadi <[email protected]> wrote: > >> So the main question here is how one can stop the unbounded pipeline at >> runtime. >> >> You can emit a special watermark (Long.MAX_VALUE) that will flush the >> entire pipeline. and will process. If that also makes runner stop reading >> from source, I am not sure, I would like to know. After that, I don't know >> if p.run() actually returns. >> >> On Thu, Jun 23, 2016 at 4:02 PM, Jesse Anderson <[email protected]> >> wrote: >> >>> No code example that I know of. Look over the bounded read code in >>> KafkaIO. Use that as a base. >>> >>> On Thu, Jun 23, 2016, 3:57 PM amir bahmanyari <[email protected]> >>> wrote: >>> >>>> Thanks Jesse. >>>> Any KafkaIO code example that detects that end of file pls? >>>> Thanks >>>> >>>> >>>> ------------------------------ >>>> *From:* Jesse Anderson <[email protected]> >>>> *To:* amir bahmanyari <[email protected]>; " >>>> [email protected]" <[email protected]> >>>> *Sent:* Thursday, June 23, 2016 3:39 PM >>>> >>>> *Subject:* Re: End-of-data indicator in Unbounded KafkaIO >>>> >>>> You bound on an end of file message you emit at the producer. So the >>>> consumer or Kafka IO read would continue to read until an end of file >>>> message is reached. The number in the read method is arbitrary. You would >>>> write your own. >>>> >>>> On Thu, Jun 23, 2016, 3:34 PM amir bahmanyari <[email protected]> >>>> wrote: >>>> >>>> Thanks Jesse. >>>> I know bounded should do it. But, bounded gets tricky when you dont >>>> know how many records you may have in the data file. >>>> There is an upper bound, but what if there are more records than the >>>> upper-bound? >>>> I can set a counter in-memory, and check for its value. But, I need a >>>> way to interrupt p.run(). >>>> Not sure if there is something like this in Beam API... >>>> I appreciate other folks' opinions on this topic as well.... >>>> Thanks again. >>>> >>>> ------------------------------ >>>> *From:* Jesse Anderson <[email protected]> >>>> *To:* amir bahmanyari <[email protected]>; " >>>> [email protected]" <[email protected]> >>>> *Sent:* Thursday, June 23, 2016 3:26 PM >>>> *Subject:* Re: End-of-data indicator in Unbounded KafkaIO >>>> >>>> You could make a bounded Kafka IO and wait for an end of file message. >>>> That said, I don't know if Kafka is the right technology for what >>>> you're trying to do. You might just process the files directly at that >>>> point. >>>> >>>> On Thu, Jun 23, 2016, 3:10 PM amir bahmanyari <[email protected]> >>>> wrote: >>>> >>>> Sorry colleagues. >>>> I know "End-of-data" & Unbounded dont go hand in hand. >>>> Lets say I am invoking KafkaIO unbounded. >>>> But at some point I run out of streaming data (finite number of records >>>> in my data file) and p.run() keeps running/waiting for more data and >>>> doesn't terminate of course. >>>> How do I know there has not been any more data recently coming to >>>> KafkaIo.read() for a given amount of time or any other runtime indicaor? >>>> Is there a way to interrupt p.run() upon detecting such an indicator so >>>> the execution can move on with the rest of the code? >>>> Thanks+regards >>>> Amir >>>> >>>> >>>> >>>> >>>> >>>> >> >
