Hello Bill, Just to add to what Max said previously about emitting watermarks based on ingestion time at the Kafka source, soon there will be support for event-time watermarks in the Kafka source in Flink, and we believe that we can port this also to Beam. This could help find a solution for your usecase.
For a more detailed description of what we are planning to integrate in the Flink Kafka source, you can have a look here https://issues.apache.org/jira/browse/FLINK-3375?filter=-1 <https://issues.apache.org/jira/browse/FLINK-3375?filter=-1> Cheers, Kostas > On Mar 30, 2016, at 8:13 PM, Bill McCarthy <[email protected]> wrote: > > Thanks Max, > > You explanation of the difference between streaming and batch boiling down to > the execution mode on the Flink runner is a good one, and has helped me to > understand the ramifications better. Thanks for helping me to clear that up > in my mind. > > I also tend to agree with yourself and others on this thread that that is > something that would ideally be deduced by the runner implementation, as a > function of features of the DAG. On the other hand, setting this manually is > certainly not a big deal for me. > > See my other email to the group, in response to Lukasz's question, for more > details on my motivation for doing what I've described. > > Regarding your final question, I'm unclear on whether I need to have > 1. a custom Kafka source which approximates event time watermarking or > 2. allowed lateness in my windowing. > I would have thought that I could achieve what I want with the latter, but > your point has raised doubts about that. > > Thanks as always for your helpful response, > > Bill > >> On Mar 30, 2016, at 12:21 PM, Maximilian Michels <[email protected]> wrote: >> >> Hi Bill, >> >> By "streaming mode" I meant Beam's built-in flag in PipelineOptions to >> indicate batch or stream execution mode. Depending on this flag, the >> Flink Runner translates to either Flink's DataSet (batch) or >> DataStream (stream) API. The DataStream API can also read from bounded >> collections like file sources. >> >> Your mental model is by no means way off. Reading from bounded sources >> in a streaming program can make sense. However, the typical uses cases >> we have seen for streaming in the past, is to read from an unbounded >> source (e.g. Kafka) and process all incoming records using event time >> / windows / triggering as they arrive. This is a great advantage over >> the so called Lambda architecture where you typically have an >> approximate real-time streaming layer and a batch layer which does the >> heavy lifting. >> >> The typical use case I see for reading from a bounded collection in >> streaming, is when you have static data (like a ML model or a stop >> word list). These would typically be kept as state. On the other hand, >> your use case would also work if you assigned timestamps and >> watermarks correctly for the HDFS source. You can then union the >> streams and process the resulting stream correctly according to time. >> That is the beauty of event time. >> >> By default, the Flink Runner assigns ingestion time for Kafka. That >> means that timestamps/watermarks are assigned according to the current >> system time. This is probably not what you want when you want to >> combine these two streams. You can change timestamps between operators >> but you can only assign watermarks at the source. Thus, to assign >> watermarks from Kafka or a file source, you would have to implement a >> custom beam source. >> >> Cheers, >> Max >> >> >> On Wed, Mar 30, 2016 at 4:37 PM, Bill McCarthy >> <[email protected]> wrote: >>> Thanks Max, >>> >>> I'm unsure what you mean by the "batch part" and "streaming execution"... >>> Are you saying that I have to run my entire pipeline in either batch or >>> streaming modes? >>> >>> Perhaps a brief description of what I'm trying to do would help: >>> >>> 1. I've got some historical intervalized timeseries data >>> 2. I've also got some live intervalized timeseries data >>> 3. I want to process both those flows of data in a uniform fashion: >>> calculating windowed statistics >>> 4. My thought was that I'd have the historical data stored in some easy data >>> store for Beam (e.g. HDFS) >>> 5. I'd put live data on Kafka >>> 6. Then load up 2 PCollections, one from HDFS and one from Kafka >>> 7. Then perform a Flatten transform to get them in one PCollection >>> 8. Then run my windowing over the flattened PCollection >>> 9. ... >>> >>> I like this approach, because my transforms don't have to care about whether >>> data is live or historical and I can have one processing pipeline for both. >>> >>> Am I barking up the wrong tree? Is my mental model way off, with respect to >>> how to combine historical and live data? >>> >>> Thanks >>> >>> Bill >>> >>> On Mar 30, 2016, at 5:59 AM, Maximilian Michels <[email protected]> wrote: >>> >>> Hi Bill, >>> >>> The batch part of the Flink Runners supports reading from a finite >>> collection, but I'm assuming you're working with the streaming execution of >>> the Flink runner. We haven't implemented support in the Runner yet but Flink >>> natively supports reading from finite sources. So it looks fairly easy to >>> implement. I've filed a JIRA issue and would like to look into this later >>> today. >>> >>> I've recently added a test case (UnboundedSourceITCase) which throws a >>> custom exception when the end of the output has been reached. Admittedly, >>> that is not a very nice approach but it works. All the more, we should >>> support finite sources in streaming mode. >>> >>> Best, >>> Max >>> >>> On Wed, Mar 30, 2016 at 2:43 AM, William McCarthy >>> <[email protected]> wrote: >>>> >>>> Hi, >>>> >>>> I want to get access to a bounded PCollection in Beam on the Flink runner. >>>> Ideally, I’d pull from HDFS. Is that supported? If not, what would be the >>>> best way for me to get something bounded, for testing purposes? >>>> >>>> Thanks, >>>> >>>> Bill >>> >>>
