Great. I'd be happy to contribute. I added 2 sub-tasks in
https://issues.apache.org/jira/browse/FLINK-5479.

Someone with the privileges could assign this sub-task to me:
https://issues.apache.org/jira/browse/FLINK-9183?

On Mon, Apr 16, 2018 at 3:14 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> Fully agree Juho!
>
> Do you want to contribute the docs fix?
> If yes, we should update FLINK-5479 to make sure that the warning is
> removed once the bug is fixed.
>
> Thanks, Fabian
>
> 2018-04-12 9:32 GMT+02:00 Juho Autio <juho.au...@rovio.com>:
>
>> Looks like the bug https://issues.apache.org/jira/browse/FLINK-5479 is
>> entirely preventing this feature to be used if there are any idle
>> partitions. It would be nice to mention in documentation that currently
>> this requires all subscribed partitions to have a constant stream of data
>> with growing timestamps. When watermark gets stalled on an idle partition
>> it blocks everything.
>>
>> Link to current documentation:
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/
>> connectors/kafka.html#kafka-consumers-and-timestamp-
>> extractionwatermark-emission
>>
>> On Mon, Dec 4, 2017 at 4:29 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> You are right, offsets cannot be used for tracking processing progress.
>>> I think setting Kafka offsets with respect to some progress notion other
>>> than "has been consumed" would be highly application specific and hard to
>>> generalize.
>>> As you said, there might be a window (such as a session window) that is
>>> open much longer than all other windows and which would hold back the
>>> offset. Other applications might not use the built-in windows at all but
>>> custom ProcessFunctions.
>>>
>>> Have you considered tracking progress using watermarks?
>>>
>>> 2017-12-04 14:42 GMT+01:00 Juho Autio <juho.au...@rovio.com>:
>>>
>>>> Thank you Fabian. Really clear explanation. That matches with my
>>>> observation indeed (data is not dropped from either small or big topic, but
>>>> the offsets are advancing in kafka side already before those offsets have
>>>> been triggered from a window operator).
>>>>
>>>> This means that it's a bit harder to meaningfully monitor the job's
>>>> progress solely based on kafka consumer offsets. Is there a reason why
>>>> Flink couldn't instead commit the offsets after they have been triggered
>>>> from downstream windows? I could imagine that this might pose a problem if
>>>> there are any windows that remain open for a very long time, but in general
>>>> it would be useful IMHO. Or Flink could even commit both (read vs.
>>>> triggered) offsets to kafka for monitoring purposes.
>>>>
>>>> On Mon, Dec 4, 2017 at 3:30 PM, Fabian Hueske <fhue...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Juho,
>>>>>
>>>>> the partitions of both topics are independently consumed, i.e., at
>>>>> their own speed without coordination. With the configuration that Gordon
>>>>> linked, watermarks are generated per partition.
>>>>> Each source task maintains the latest (and highest) watermark per
>>>>> partition and propagates the smallest watermark. The same mechanism is
>>>>> applied for watermarks across tasks (this is what Kien referred to).
>>>>>
>>>>> In the case that you are describing, the partitions of the smaller
>>>>> topic are faster consumed (hence the offsets are faster aligned) but
>>>>> watermarks are emitted "at the speed" of the bigger topic.
>>>>> Therefore, the timestamps of records from the smaller topic can be
>>>>> much ahead of the watermark.
>>>>> In principle, that does not pose a problem. Stateful operators (such
>>>>> as windows) remember the "early" records and process them when they 
>>>>> receive
>>>>> a watermark passes the timestamps of the early records.
>>>>>
>>>>> Regarding your question "Are they committed to Kafka before their
>>>>> watermark has passed on Flink's side?":
>>>>> The offsets of the smaller topic might be checkpointed when all
>>>>> partitions have been read to the "end" and the bigger topic is still
>>>>> catching up.
>>>>> The watermarks are moving at the speed of the bigger topic, but all
>>>>> "early" events of the smaller topic are stored in stateful operators and
>>>>> are checkpointed as well.
>>>>>
>>>>> So, you do not lose neither early nor late data.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>>
>>>>>
>>>>> 2017-12-01 13:43 GMT+01:00 Juho Autio <juho.au...@rovio.com>:
>>>>>
>>>>>> Thanks for the answers, I still don't understand why I can see the
>>>>>> offsets being quickly committed to Kafka for the "small topic"? Are they
>>>>>> committed to Kafka before their watermark has passed on Flink's side? 
>>>>>> That
>>>>>> would be quite confusing.. Indeed when Flink handles the state/offsets
>>>>>> internally, the consumer offsets are committed to Kafka just for 
>>>>>> reference.
>>>>>>
>>>>>> Otherwise, what you're saying sounds very good to me. The
>>>>>> documentation just doesn't explicitly say anything about how it works
>>>>>> across topics.
>>>>>>
>>>>>> On Kien's answer: "When you join multiple stream with different
>>>>>> watermarks", note that I'm not joining any topics myself, I get them as a
>>>>>> single stream from the Flink kafka consumer based on the list of topics
>>>>>> that I asked for.
>>>>>>
>>>>>> Thanks,
>>>>>> Juho
>>>>>>
>>>>>> On Wed, Nov 22, 2017 at 2:57 PM, Tzu-Li (Gordon) Tai <
>>>>>> tzuli...@apache.org> wrote:
>>>>>>
>>>>>>> Hi!
>>>>>>>
>>>>>>> The FlinkKafkaConsumer can handle watermark advancement with
>>>>>>> per-Kafka-partition awareness (across partitions of different
>>>>>>> topics).
>>>>>>> You can see an example of how to do that here [1].
>>>>>>>
>>>>>>> Basically what this does is that it generates watermarks within the
>>>>>>> Kafka
>>>>>>> consumer individually for each Kafka partition, and the per-partition
>>>>>>> watermarks are aggregated and emitted from the consumer in the same
>>>>>>> way that
>>>>>>> watermarks are aggregated on a stream shuffle; only when the low
>>>>>>> watermark
>>>>>>> advances across all partitions, should a watermark be emitted from
>>>>>>> the
>>>>>>> consumer.
>>>>>>>
>>>>>>> Therefore, this helps avoid the problem that you described, in which
>>>>>>> a
>>>>>>> "big_topic" has subscribed partitions that lags behind others. In
>>>>>>> this case
>>>>>>> and when the above feature is used, the event time would advance
>>>>>>> along with
>>>>>>> the lagging "big_topic" partitions and would not result in messages
>>>>>>> being
>>>>>>> recognized as late and discarded.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Gordon
>>>>>>>
>>>>>>> [1]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>>>>>> dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Sent from: http://apache-flink-user-maili
>>>>>>> ng-list-archive.2336050.n4.nabble.com/
>>>>>>>
>>>>>>
>>>>>>
>>>>
>>>
>>
>

Reply via email to