The case I described was for experiment only but data skewness would happen
in production. The current implementation will block the watermark emission
to downstream until all partition move forward which has great impact on
latency. It may be a good idea to expose an API to users to decide what the
best way is to control watermark emission

On Fri, 13 Jan 2017 at 21:57 Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Hi,
>
> This is expected behaviour due to how the per-partition watermarks are
> designed in the Kafka consumer, but I think it’s probably a good idea to
> handle idle partitions also when the Kafka consumer itself emits
> watermarks. I’ve filed a JIRA issue for this:
> https://issues.apache.org/jira/browse/FLINK-5479.
>
> For the time being, I don’t think there will be an easy way to avoid this
> with the existing APIs, unfortunately. Is the skewed partition data
> intentional, or only for experimental purposes?
>
> Best,
> Gordon
>
> On January 12, 2017 at 5:28:40 PM, tao xiao (xiaotao...@gmail.com) wrote:
>
> Hi team,
>
> I have a topic with 2 partitions in Kafka. I produced all data to
> partition 0 and no data to partition 1. I created a Flink job with
> parallelism to 1 that consumes that topic and count the events with session
> event window (5 seconds gap). It turned out that the session event window
> was never closed even I sent a message with 10 minutes gap. After digging
> into the source code, AbstractFetcher[1] that is responsible for sending
> watermark to downstream calculates the min watermark of all partitions. Due
> to the fact that we don't have data in partition 1, the watermark returned
> from partition 1is always Long.MIN_VALUE therefore AbstractFetcher never
> fires the watermark to downstream.
>
> I want to know if this is expected behavior or a bug. If this is expected
> behavior how do I avoid the delay of watermark firing when data is not
> evenly distributed to all partitions?
>
> This is the timestamp extractor I used
>
> public class ExactTimestampExtractor implements
> AssignerWithPeriodicWatermarks<SessionEvent> {
>
> private long currentMaxTimestamp = Long.MIN_VALUE;
>
> @Nullable
> @Override
> public Watermark getCurrentWatermark() {
> return new Watermark(currentMaxTimestamp == Long.MIN_VALUE ?
> Long.MIN_VALUE : currentMaxTimestamp - 1);
> }
>
> @Override
> public long extractTimestamp(SessionEvent element, long
> previousElementTimestamp) {
> long eventStartTime = (long) element.get(SessionEvent.SESSION_START_DT);
> if (eventStartTime > currentMaxTimestamp) {
> currentMaxTimestamp = eventStartTime;
> }
>
> return eventStartTime;
> }
> }
>
> and this is the Flink topo
>
> // get input data
> FlinkKafkaConsumer010<SessionEvent> consumer = new
> FlinkKafkaConsumer010<>("topic4",
> new MyOwnSchema()
> consumer.assignTimestampsAndWatermarks(new ExactTimestampExtractor());
> DataStream<SessionEvent> input = env.addSource(consumer);
>
> input.
> keyBy("id").
> window(EventTimeSessionWindows.withGap(Time.seconds(5))).
> reduce(new Reducer(), new WindowFunction()).
> print();
>
> //        // execute program
> env.execute("a job");
>
> I used the latest code in github
>
> [1]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java#L539
>
>

Reply via email to