Note that KafkaIO lets you set your own watermark for each record. On Fri, Jun 24, 2016 at 9:45 AM, Raghu Angadi <[email protected]> wrote:
> So the main question here is how one can stop the unbounded pipeline at > runtime. > > You can emit a special watermark (Long.MAX_VALUE) that will flush the > entire pipeline. and will process. If that also makes runner stop reading > from source, I am not sure, I would like to know. After that, I don't know > if p.run() actually returns. > > On Thu, Jun 23, 2016 at 4:02 PM, Jesse Anderson <[email protected]> > wrote: > >> No code example that I know of. Look over the bounded read code in >> KafkaIO. Use that as a base. >> >> On Thu, Jun 23, 2016, 3:57 PM amir bahmanyari <[email protected]> >> wrote: >> >>> Thanks Jesse. >>> Any KafkaIO code example that detects that end of file pls? >>> Thanks >>> >>> >>> ------------------------------ >>> *From:* Jesse Anderson <[email protected]> >>> *To:* amir bahmanyari <[email protected]>; " >>> [email protected]" <[email protected]> >>> *Sent:* Thursday, June 23, 2016 3:39 PM >>> >>> *Subject:* Re: End-of-data indicator in Unbounded KafkaIO >>> >>> You bound on an end of file message you emit at the producer. So the >>> consumer or Kafka IO read would continue to read until an end of file >>> message is reached. The number in the read method is arbitrary. You would >>> write your own. >>> >>> On Thu, Jun 23, 2016, 3:34 PM amir bahmanyari <[email protected]> >>> wrote: >>> >>> Thanks Jesse. >>> I know bounded should do it. But, bounded gets tricky when you dont know >>> how many records you may have in the data file. >>> There is an upper bound, but what if there are more records than the >>> upper-bound? >>> I can set a counter in-memory, and check for its value. But, I need a >>> way to interrupt p.run(). >>> Not sure if there is something like this in Beam API... >>> I appreciate other folks' opinions on this topic as well.... >>> Thanks again. >>> >>> ------------------------------ >>> *From:* Jesse Anderson <[email protected]> >>> *To:* amir bahmanyari <[email protected]>; " >>> [email protected]" <[email protected]> >>> *Sent:* Thursday, June 23, 2016 3:26 PM >>> *Subject:* Re: End-of-data indicator in Unbounded KafkaIO >>> >>> You could make a bounded Kafka IO and wait for an end of file message. >>> That said, I don't know if Kafka is the right technology for what you're >>> trying to do. You might just process the files directly at that point. >>> >>> On Thu, Jun 23, 2016, 3:10 PM amir bahmanyari <[email protected]> >>> wrote: >>> >>> Sorry colleagues. >>> I know "End-of-data" & Unbounded dont go hand in hand. >>> Lets say I am invoking KafkaIO unbounded. >>> But at some point I run out of streaming data (finite number of records >>> in my data file) and p.run() keeps running/waiting for more data and >>> doesn't terminate of course. >>> How do I know there has not been any more data recently coming to >>> KafkaIo.read() for a given amount of time or any other runtime indicaor? >>> Is there a way to interrupt p.run() upon detecting such an indicator so >>> the execution can move on with the rest of the code? >>> Thanks+regards >>> Amir >>> >>> >>> >>> >>> >>> >
