Hi,
The watermark does not seem to get updated at all after the first one
is emitted. We used to get out-of-order warnings, but we changed to
job to support a bounded timestamp extractor so we no longer get those
warnings.

Our timestamp extractor looks like this

        class 
TsExtractor[T
](time : Time) extends 
BoundedOutOfOrdernessTimestampExtractor[Timestamped[T
]](time : Time) {
override def 
extractTimestamp(element: Timestamped[T
]): Long = element.timestamp
}

Our stream topology starts with a single stream, then we do two
separate flat map and filtering operations on the initial stream to
transform data batches 
into streams of two different event types. We then
assignTimestampsAndWatermarks(new 
TsExtractor[EventType](Time.seconds
(20
))) for each event type on both 
branches before unioning the two branches to a single stream again
(the reason for the split is that the data used to come from two
different topics). 

William

----- Original Message -----
From:
 "Gary Yao" <g...@data-artisans.com>

To:
"William Saar" <will...@saar.se>
Cc:
"user" <user@flink.apache.org>
Sent:
Thu, 18 Jan 2018 11:11:17 +0100
Subject:
Re: Far too few watermarks getting generated with Kafka source

Hi William,

How often does the Watermark get updated? Can you share your code that
generates
the watermarks? Watermarks should be strictly ascending. If your code
produces
watermarks that are not ascending, smaller ones will be discarded.
Could it be
that the events in Kafka are more "out of order" with respect to event
time than
in your file?

You can assign timestamps in the Kafka source or later. The Flink
documentation
has a section on why it could be beneficial to assign Watermarks in
the Kafka
source:

 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition
[1]

Best,
Gary

On Wed, Jan 17, 2018 at 5:15 PM, William Saar <will...@saar.se [2]>
 wrote:
Hi,
I have a job where we read data from either Kafka or a file (for
testing), decode the entries and flat map them into events, and then
add a timestamp and watermark assigner to the events in a later
operation. This seems to generate periodic watermarks when running
from a file, but when Kafka is the source we barely get any watermark
updates. What could be causing this? (the environment has
setAutowatermarkInterval(1000))

Do we need to do all the timestamp and watermark assignment in the
Kafka source? or should it work to do it in later operations? The
events do seem to get propagated through the pipeline, we're just not
getting watermarks...

Thanks,
William

 

Links:
------
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition
[2] mailto:will...@saar.se

Reply via email to