Hi,

This is expected behaviour due to how the per-partition watermarks are designed 
in the Kafka consumer, but I think it’s probably a good idea to handle idle 
partitions also when the Kafka consumer itself emits watermarks. I’ve filed a 
JIRA issue for this: https://issues.apache.org/jira/browse/FLINK-5479.

For the time being, I don’t think there will be an easy way to avoid this with 
the existing APIs, unfortunately. Is the skewed partition data intentional, or 
only for experimental purposes?

Best,
Gordon

On January 12, 2017 at 5:28:40 PM, tao xiao (xiaotao...@gmail.com) wrote:

Hi team,

I have a topic with 2 partitions in Kafka. I produced all data to partition 0 
and no data to partition 1. I created a Flink job with parallelism to 1 that 
consumes that topic and count the events with session event window (5 seconds 
gap). It turned out that the session event window was never closed even I sent 
a message with 10 minutes gap. After digging into the source code, 
AbstractFetcher[1] that is responsible for sending watermark to downstream 
calculates the min watermark of all partitions. Due to the fact that we don't 
have data in partition 1, the watermark returned from partition 1is always 
Long.MIN_VALUE therefore AbstractFetcher never fires the watermark to 
downstream. 

I want to know if this is expected behavior or a bug. If this is expected 
behavior how do I avoid the delay of watermark firing when data is not evenly 
distributed to all partitions?

This is the timestamp extractor I used

public class ExactTimestampExtractor implements 
AssignerWithPeriodicWatermarks<SessionEvent> {

private long currentMaxTimestamp = Long.MIN_VALUE;

@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : 
currentMaxTimestamp - 1);
}

@Override
public long extractTimestamp(SessionEvent element, long 
previousElementTimestamp) {
long eventStartTime = (long) element.get(SessionEvent.SESSION_START_DT);
if (eventStartTime > currentMaxTimestamp) {
currentMaxTimestamp = eventStartTime;
}

return eventStartTime;
}
}

and this is the Flink topo

// get input data
FlinkKafkaConsumer010<SessionEvent> consumer = new 
FlinkKafkaConsumer010<>("topic4",
new MyOwnSchema()
consumer.assignTimestampsAndWatermarks(new ExactTimestampExtractor());
DataStream<SessionEvent> input = env.addSource(consumer);

input.
keyBy("id").
window(EventTimeSessionWindows.withGap(Time.seconds(5))).
reduce(new Reducer(), new WindowFunction()).
print();

//        // execute program
env.execute("a job");

I used the latest code in github

[1] 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java#L539

Reply via email to