Fully agree Juho! Do you want to contribute the docs fix? If yes, we should update FLINK-5479 to make sure that the warning is removed once the bug is fixed.
Thanks, Fabian 2018-04-12 9:32 GMT+02:00 Juho Autio <juho.au...@rovio.com>: > Looks like the bug https://issues.apache.org/jira/browse/FLINK-5479 is > entirely preventing this feature to be used if there are any idle > partitions. It would be nice to mention in documentation that currently > this requires all subscribed partitions to have a constant stream of data > with growing timestamps. When watermark gets stalled on an idle partition > it blocks everything. > > Link to current documentation: > https://ci.apache.org/projects/flink/flink-docs- > master/dev/connectors/kafka.html#kafka-consumers-and- > timestamp-extractionwatermark-emission > > On Mon, Dec 4, 2017 at 4:29 PM, Fabian Hueske <fhue...@gmail.com> wrote: > >> You are right, offsets cannot be used for tracking processing progress. I >> think setting Kafka offsets with respect to some progress notion other than >> "has been consumed" would be highly application specific and hard to >> generalize. >> As you said, there might be a window (such as a session window) that is >> open much longer than all other windows and which would hold back the >> offset. Other applications might not use the built-in windows at all but >> custom ProcessFunctions. >> >> Have you considered tracking progress using watermarks? >> >> 2017-12-04 14:42 GMT+01:00 Juho Autio <juho.au...@rovio.com>: >> >>> Thank you Fabian. Really clear explanation. That matches with my >>> observation indeed (data is not dropped from either small or big topic, but >>> the offsets are advancing in kafka side already before those offsets have >>> been triggered from a window operator). >>> >>> This means that it's a bit harder to meaningfully monitor the job's >>> progress solely based on kafka consumer offsets. Is there a reason why >>> Flink couldn't instead commit the offsets after they have been triggered >>> from downstream windows? I could imagine that this might pose a problem if >>> there are any windows that remain open for a very long time, but in general >>> it would be useful IMHO. Or Flink could even commit both (read vs. >>> triggered) offsets to kafka for monitoring purposes. >>> >>> On Mon, Dec 4, 2017 at 3:30 PM, Fabian Hueske <fhue...@gmail.com> wrote: >>> >>>> Hi Juho, >>>> >>>> the partitions of both topics are independently consumed, i.e., at >>>> their own speed without coordination. With the configuration that Gordon >>>> linked, watermarks are generated per partition. >>>> Each source task maintains the latest (and highest) watermark per >>>> partition and propagates the smallest watermark. The same mechanism is >>>> applied for watermarks across tasks (this is what Kien referred to). >>>> >>>> In the case that you are describing, the partitions of the smaller >>>> topic are faster consumed (hence the offsets are faster aligned) but >>>> watermarks are emitted "at the speed" of the bigger topic. >>>> Therefore, the timestamps of records from the smaller topic can be much >>>> ahead of the watermark. >>>> In principle, that does not pose a problem. Stateful operators (such as >>>> windows) remember the "early" records and process them when they receive a >>>> watermark passes the timestamps of the early records. >>>> >>>> Regarding your question "Are they committed to Kafka before their >>>> watermark has passed on Flink's side?": >>>> The offsets of the smaller topic might be checkpointed when all >>>> partitions have been read to the "end" and the bigger topic is still >>>> catching up. >>>> The watermarks are moving at the speed of the bigger topic, but all >>>> "early" events of the smaller topic are stored in stateful operators and >>>> are checkpointed as well. >>>> >>>> So, you do not lose neither early nor late data. >>>> >>>> Best, Fabian >>>> >>>> >>>> >>>> 2017-12-01 13:43 GMT+01:00 Juho Autio <juho.au...@rovio.com>: >>>> >>>>> Thanks for the answers, I still don't understand why I can see the >>>>> offsets being quickly committed to Kafka for the "small topic"? Are they >>>>> committed to Kafka before their watermark has passed on Flink's side? That >>>>> would be quite confusing.. Indeed when Flink handles the state/offsets >>>>> internally, the consumer offsets are committed to Kafka just for >>>>> reference. >>>>> >>>>> Otherwise, what you're saying sounds very good to me. The >>>>> documentation just doesn't explicitly say anything about how it works >>>>> across topics. >>>>> >>>>> On Kien's answer: "When you join multiple stream with different >>>>> watermarks", note that I'm not joining any topics myself, I get them as a >>>>> single stream from the Flink kafka consumer based on the list of topics >>>>> that I asked for. >>>>> >>>>> Thanks, >>>>> Juho >>>>> >>>>> On Wed, Nov 22, 2017 at 2:57 PM, Tzu-Li (Gordon) Tai < >>>>> tzuli...@apache.org> wrote: >>>>> >>>>>> Hi! >>>>>> >>>>>> The FlinkKafkaConsumer can handle watermark advancement with >>>>>> per-Kafka-partition awareness (across partitions of different topics). >>>>>> You can see an example of how to do that here [1]. >>>>>> >>>>>> Basically what this does is that it generates watermarks within the >>>>>> Kafka >>>>>> consumer individually for each Kafka partition, and the per-partition >>>>>> watermarks are aggregated and emitted from the consumer in the same >>>>>> way that >>>>>> watermarks are aggregated on a stream shuffle; only when the low >>>>>> watermark >>>>>> advances across all partitions, should a watermark be emitted from the >>>>>> consumer. >>>>>> >>>>>> Therefore, this helps avoid the problem that you described, in which a >>>>>> "big_topic" has subscribed partitions that lags behind others. In >>>>>> this case >>>>>> and when the above feature is used, the event time would advance >>>>>> along with >>>>>> the lagging "big_topic" partitions and would not result in messages >>>>>> being >>>>>> recognized as late and discarded. >>>>>> >>>>>> Cheers, >>>>>> Gordon >>>>>> >>>>>> [1] >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/ >>>>>> dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Sent from: http://apache-flink-user-maili >>>>>> ng-list-archive.2336050.n4.nabble.com/ >>>>>> >>>>> >>>>> >>> >> >