[FLINK-3375] [kafka connector] Add per-Kafka-partition watermark generation to the FlinkKafkaConsumer
This closes #1839 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ac1549e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ac1549e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ac1549e Branch: refs/heads/master Commit: 0ac1549ec58c1737a79e5770a171a8b14bed56dc Parents: 885d543 Author: kl0u <kklou...@gmail.com> Authored: Tue Mar 8 17:35:14 2016 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Apr 13 01:10:54 2016 +0200 ---------------------------------------------------------------------- .../connectors/kafka/FlinkKafkaConsumer08.java | 56 +-- .../connectors/kafka/internals/Fetcher.java | 12 +- .../kafka/internals/LegacyFetcher.java | 51 ++- .../kafka/internals/ZookeeperOffsetHandler.java | 4 +- .../connectors/kafka/Kafka08ITCase.java | 16 +- .../connectors/kafka/KafkaConsumerTest.java | 12 +- .../connectors/kafka/FlinkKafkaConsumer09.java | 13 +- .../connectors/kafka/Kafka09ITCase.java | 11 +- .../kafka/FlinkKafkaConsumerBase.java | 357 ++++++++++++++++++- .../kafka/internals/KafkaPartitionState.java | 65 ++++ .../connectors/kafka/KafkaConsumerTestBase.java | 234 +++++++++++- .../AssignerWithPeriodicWatermarks.java | 2 +- 12 files changed, 744 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0ac1549e/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java index 865fe36..4748781 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java @@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.connectors.kafka.internals.Fetcher; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaPartitionState; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher; import org.apache.flink.streaming.connectors.kafka.internals.OffsetHandler; @@ -108,11 +109,6 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> { private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer08.class); - /** Magic number to define an unset offset. Negative offsets are not used by Kafka (invalid), - * and we pick a number that is probably (hopefully) not used by Kafka as a magic number for anything else. */ - public static final long OFFSET_NOT_SET = -915623761776L; - - /** Configuration key for the number of retries for getting the partition info */ public static final String GET_PARTITIONS_RETRIES_KEY = "flink.get-partitions.retry"; @@ -252,17 +248,19 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> { this.fetcher = null; // fetcher remains null return; } - // offset handling offsetHandler = new ZookeeperOffsetHandler(props); committedOffsets = new HashMap<>(); - Map<KafkaTopicPartition, Long> subscribedPartitionsWithOffsets = new HashMap<>(subscribedPartitions.size()); - // initially load the map with "offset not set" + // initially load the map with "offset not set", last max read timestamp set to Long.MIN_VALUE + // and mark the partition as in-active, until we receive the first element + Map<KafkaTopicPartition, KafkaPartitionState> subscribedPartitionsWithOffsets = + new HashMap<>(subscribedPartitions.size()); for(KafkaTopicPartition ktp: subscribedPartitions) { - subscribedPartitionsWithOffsets.put(ktp, FlinkKafkaConsumer08.OFFSET_NOT_SET); + subscribedPartitionsWithOffsets.put(ktp, + new KafkaPartitionState(ktp.getPartition(), FlinkKafkaConsumerBase.OFFSET_NOT_SET)); } // seek to last known pos, from restore request @@ -272,16 +270,20 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> { thisConsumerIndex, KafkaTopicPartition.toString(restoreToOffset)); } // initialize offsets with restored state - this.offsetsState = restoreToOffset; - subscribedPartitionsWithOffsets.putAll(restoreToOffset); + this.partitionState = restoreInfoFromCheckpoint(); + subscribedPartitionsWithOffsets.putAll(partitionState); restoreToOffset = null; } else { - // start with empty offsets - offsetsState = new HashMap<>(); + // start with empty partition state + partitionState = new HashMap<>(); // no restore request: overwrite offsets. - subscribedPartitionsWithOffsets.putAll(offsetHandler.getOffsets(subscribedPartitions)); + for(Map.Entry<KafkaTopicPartition, Long> offsetInfo: offsetHandler.getOffsets(subscribedPartitions).entrySet()) { + KafkaTopicPartition key = offsetInfo.getKey(); + subscribedPartitionsWithOffsets.put(key, + new KafkaPartitionState(key.getPartition(), offsetInfo.getValue())); + } } if(subscribedPartitionsWithOffsets.size() != subscribedPartitions.size()) { throw new IllegalStateException("The subscribed partitions map has more entries than the subscribed partitions " + @@ -289,22 +291,22 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> { } // create fetcher - fetcher = new LegacyFetcher(subscribedPartitionsWithOffsets, props, + fetcher = new LegacyFetcher<T>(this, subscribedPartitionsWithOffsets, props, getRuntimeContext().getTaskName(), getRuntimeContext().getUserCodeClassLoader()); } @Override public void run(SourceContext<T> sourceContext) throws Exception { if (fetcher != null) { - // For non-checkpointed sources, a thread which periodically commits the current offset into ZK. - PeriodicOffsetCommitter<T> offsetCommitter = null; - - // check whether we need to start the periodic checkpoint committer StreamingRuntimeContext streamingRuntimeContext = (StreamingRuntimeContext) getRuntimeContext(); + + // if we have a non-checkpointed source, start a thread which periodically commits + // the current offset into ZK. + + PeriodicOffsetCommitter<T> offsetCommitter = null; if (!streamingRuntimeContext.isCheckpointingEnabled()) { // we use Kafka's own configuration parameter key for this. - // Note that the default configuration value in Kafka is 60 * 1000, so we use the - // same here. + // Note that the default configuration value in Kafka is 60 * 1000, so we use the same here. long commitInterval = getLongFromConfig(props, "auto.commit.interval.ms", 60000); offsetCommitter = new PeriodicOffsetCommitter<>(commitInterval, this); offsetCommitter.setDaemon(true); @@ -313,7 +315,7 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> { } try { - fetcher.run(sourceContext, deserializer, offsetsState); + fetcher.run(sourceContext, deserializer, partitionState); } finally { if (offsetCommitter != null) { offsetCommitter.close(); @@ -439,7 +441,7 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> { private final FlinkKafkaConsumer08<T> consumer; private volatile boolean running = true; - public PeriodicOffsetCommitter(long commitInterval, FlinkKafkaConsumer08<T> consumer) { + PeriodicOffsetCommitter(long commitInterval, FlinkKafkaConsumer08<T> consumer) { this.commitInterval = commitInterval; this.consumer = consumer; } @@ -453,9 +455,12 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> { Thread.sleep(commitInterval); // ------------ commit current offsets ---------------- - // create copy of current offsets + // create copy a deep copy of the current offsets @SuppressWarnings("unchecked") - HashMap<KafkaTopicPartition, Long> currentOffsets = (HashMap<KafkaTopicPartition, Long>) consumer.offsetsState.clone(); + HashMap<KafkaTopicPartition, Long> currentOffsets = new HashMap<>(consumer.partitionState.size()); + for (Map.Entry<KafkaTopicPartition, KafkaPartitionState> entry: consumer.partitionState.entrySet()) { + currentOffsets.put(entry.getKey(), entry.getValue().getOffset()); + } consumer.commitOffsets(currentOffsets); } catch (InterruptedException e) { if (running) { @@ -474,7 +479,6 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> { this.running = false; this.interrupt(); } - } http://git-wip-us.apache.org/repos/asf/flink/blob/0ac1549e/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java index 2dacbce..f86687e 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java @@ -39,8 +39,10 @@ public interface Fetcher { /** * Starts fetch data from Kafka and emitting it into the stream. * - * <p>To provide exactly once guarantees, the fetcher needs emit a record and update the update - * of the last consumed offset in one atomic operation:</p> + * <p>To provide exactly once guarantees, the fetcher needs emit a record and update the last + * consumed offset in one atomic operation. This is done in the + * {@link org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase#processElement(SourceFunction.SourceContext, KafkaTopicPartition, Object, long)} + * which is called from within the {@link SourceFunction.SourceContext#getCheckpointLock()}, as shown below:</p> * <pre>{@code * * while (running) { @@ -48,8 +50,7 @@ public interface Fetcher { * long offset = ... * int partition = ... * synchronized (sourceContext.getCheckpointLock()) { - * sourceContext.collect(next); - * lastOffsets[partition] = offset; + * processElement(sourceContext, partition, next, offset) * } * } * }</pre> @@ -60,8 +61,7 @@ public interface Fetcher { * @param lastOffsets The map into which to store the offsets for which elements are emitted (operator state) */ <T> void run(SourceFunction.SourceContext<T> sourceContext, KeyedDeserializationSchema<T> valueDeserializer, - HashMap<KafkaTopicPartition, Long> lastOffsets) throws Exception; - + HashMap<KafkaTopicPartition, KafkaPartitionState> lastOffsets) throws Exception; /** * Exit run loop with given error and release all resources. http://git-wip-us.apache.org/repos/asf/flink/blob/0ac1549e/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java index fa3b6a8..c4dd55c 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java @@ -30,6 +30,7 @@ import kafka.message.MessageAndOffset; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.StringUtils; @@ -61,13 +62,12 @@ import static org.apache.flink.streaming.connectors.kafka.util.KafkaUtils.getInt * * <p>This code is in parts based on the tutorial code for the low-level Kafka consumer.</p> */ -public class LegacyFetcher implements Fetcher { +public class LegacyFetcher<T> implements Fetcher { private static final Logger LOG = LoggerFactory.getLogger(LegacyFetcher.class); private static final FetchPartition MARKER = new FetchPartition("n/a", -1, -1); - - + /** The properties that configure the Kafka connection */ private final Properties config; @@ -79,7 +79,6 @@ public class LegacyFetcher implements Fetcher { /** The classloader for dynamically loaded classes */ private final ClassLoader userCodeClassloader; - /** Reference the the thread that executed the run() method. */ private volatile Thread mainThread; @@ -94,6 +93,9 @@ public class LegacyFetcher implements Fetcher { */ private final ClosableBlockingQueue<FetchPartition> unassignedPartitions = new ClosableBlockingQueue<>(); + /** The {@link FlinkKafkaConsumer08} to whom this Fetcher belongs. */ + private final FlinkKafkaConsumer08<T> flinkKafkaConsumer; + /** * Create a LegacyFetcher instance. * @@ -103,20 +105,23 @@ public class LegacyFetcher implements Fetcher { * @param userCodeClassloader classloader for loading user code */ public LegacyFetcher( - Map<KafkaTopicPartition, Long> initialPartitionsToRead, Properties props, + FlinkKafkaConsumer08<T> owner, + Map<KafkaTopicPartition, KafkaPartitionState> initialPartitionsToRead, + Properties props, String taskName, ClassLoader userCodeClassloader) { - + + this.flinkKafkaConsumer = requireNonNull(owner); this.config = requireNonNull(props, "The config properties cannot be null"); this.userCodeClassloader = requireNonNull(userCodeClassloader); if (initialPartitionsToRead.size() == 0) { throw new IllegalArgumentException("List of initial partitions is empty"); } - for (Map.Entry<KafkaTopicPartition, Long> partitionToRead: initialPartitionsToRead.entrySet()) { + for (Map.Entry<KafkaTopicPartition, KafkaPartitionState> partitionToRead: initialPartitionsToRead.entrySet()) { KafkaTopicPartition ktp = partitionToRead.getKey(); // we increment the offset by one so that we fetch the next message in the partition. - long offset = partitionToRead.getValue(); - if (offset >= 0 && offset != FlinkKafkaConsumer08.OFFSET_NOT_SET) { + long offset = partitionToRead.getValue().getOffset(); + if (offset >= 0 && offset != FlinkKafkaConsumerBase.OFFSET_NOT_SET) { offset += 1L; } unassignedPartitions.add(new FetchPartition(ktp.getTopic(), ktp.getPartition(), offset)); @@ -141,7 +146,7 @@ public class LegacyFetcher implements Fetcher { @Override public <T> void run(SourceFunction.SourceContext<T> sourceContext, KeyedDeserializationSchema<T> deserializer, - HashMap<KafkaTopicPartition, Long> lastOffsets) throws Exception { + HashMap<KafkaTopicPartition, KafkaPartitionState> partitionState) throws Exception { // NOTE: This method needs to always release all resources it acquires @@ -173,7 +178,7 @@ public class LegacyFetcher implements Fetcher { if (brokerThread == null || !brokerThread.getNewPartitionsQueue().isOpen()) { // start new thread - brokerThread = createAndStartSimpleConsumerThread(sourceContext, deserializer, lastOffsets, partitions, leader); + brokerThread = createAndStartSimpleConsumerThread(sourceContext, deserializer, partitions, leader); brokerToThread.put(leader, brokerThread); } else { // put elements into queue of thread @@ -185,7 +190,7 @@ public class LegacyFetcher implements Fetcher { // create a new thread for connecting to this broker List<FetchPartition> seedPartitions = new ArrayList<>(); seedPartitions.add(fp); - brokerThread = createAndStartSimpleConsumerThread(sourceContext, deserializer, lastOffsets, seedPartitions, leader); + brokerThread = createAndStartSimpleConsumerThread(sourceContext, deserializer, seedPartitions, leader); brokerToThread.put(leader, brokerThread); newPartitionsQueue = brokerThread.getNewPartitionsQueue(); // update queue for the subsequent partitions } @@ -241,15 +246,13 @@ public class LegacyFetcher implements Fetcher { private <T> SimpleConsumerThread<T> createAndStartSimpleConsumerThread(SourceFunction.SourceContext<T> sourceContext, KeyedDeserializationSchema<T> deserializer, - HashMap<KafkaTopicPartition, Long> lastOffsets, List<FetchPartition> seedPartitions, Node leader) throws IOException, ClassNotFoundException { - SimpleConsumerThread<T> brokerThread; final KeyedDeserializationSchema<T> clonedDeserializer = InstantiationUtil.clone(deserializer, userCodeClassloader); // seed thread with list of fetch partitions (otherwise it would shut down immediately again - brokerThread = new SimpleConsumerThread<>(this, config, - leader, seedPartitions, unassignedPartitions, sourceContext, clonedDeserializer, lastOffsets); + SimpleConsumerThread<T> brokerThread = new SimpleConsumerThread<>(this, config, + leader, seedPartitions, unassignedPartitions, sourceContext, clonedDeserializer); brokerThread.setName(String.format("SimpleConsumer - %s - broker-%s (%s:%d)", taskName, leader.id(), leader.host(), leader.port())); @@ -387,8 +390,7 @@ public class LegacyFetcher implements Fetcher { private final SourceFunction.SourceContext<T> sourceContext; private final KeyedDeserializationSchema<T> deserializer; - private final HashMap<KafkaTopicPartition, Long> offsetsState; - + private final List<FetchPartition> partitions; private final Node broker; @@ -399,8 +401,6 @@ public class LegacyFetcher implements Fetcher { private final ClosableBlockingQueue<FetchPartition> unassignedPartitions; - - private volatile boolean running = true; /** Queue containing new fetch partitions for the consumer thread */ @@ -424,15 +424,13 @@ public class LegacyFetcher implements Fetcher { List<FetchPartition> seedPartitions, ClosableBlockingQueue<FetchPartition> unassignedPartitions, SourceFunction.SourceContext<T> sourceContext, - KeyedDeserializationSchema<T> deserializer, - HashMap<KafkaTopicPartition, Long> offsetsState) { + KeyedDeserializationSchema<T> deserializer) { this.owner = owner; this.config = config; this.broker = broker; this.partitions = seedPartitions; this.sourceContext = requireNonNull(sourceContext); this.deserializer = requireNonNull(deserializer); - this.offsetsState = requireNonNull(offsetsState); this.unassignedPartitions = requireNonNull(unassignedPartitions); this.soTimeout = getIntFromConfig(config, "socket.timeout.ms", 30000); @@ -661,8 +659,7 @@ public class LegacyFetcher implements Fetcher { continue partitionsLoop; } synchronized (sourceContext.getCheckpointLock()) { - sourceContext.collect(value); - offsetsState.put(topicPartition, offset); + owner.flinkKafkaConsumer.processElement(sourceContext, topicPartition, value, offset); } // advance offset for the next request @@ -703,7 +700,7 @@ public class LegacyFetcher implements Fetcher { List<FetchPartition> partitionsToGetOffsetsFor = new ArrayList<>(); for (FetchPartition fp : partitions) { - if (fp.nextOffsetToRead == FlinkKafkaConsumer08.OFFSET_NOT_SET) { + if (fp.nextOffsetToRead == FlinkKafkaConsumerBase.OFFSET_NOT_SET) { // retrieve the offset from the consumer partitionsToGetOffsetsFor.add(fp); } @@ -716,7 +713,7 @@ public class LegacyFetcher implements Fetcher { // we subtract -1 from the offset synchronized (sourceContext.getCheckpointLock()) { for(FetchPartition fp: partitionsToGetOffsetsFor) { - this.offsetsState.put(new KafkaTopicPartition(fp.topic, fp.partition), fp.nextOffsetToRead - 1L); + owner.flinkKafkaConsumer.updateOffsetForPartition(new KafkaTopicPartition(fp.topic, fp.partition), fp.nextOffsetToRead - 1L); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/0ac1549e/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java index 003e24f..328cab0 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.java @@ -24,7 +24,7 @@ import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.slf4j.Logger; @@ -43,7 +43,7 @@ public class ZookeeperOffsetHandler implements OffsetHandler { private static final Logger LOG = LoggerFactory.getLogger(ZookeeperOffsetHandler.class); - private static final long OFFSET_NOT_SET = FlinkKafkaConsumer08.OFFSET_NOT_SET; + private static final long OFFSET_NOT_SET = FlinkKafkaConsumerBase.OFFSET_NOT_SET; private final String groupId; http://git-wip-us.apache.org/repos/asf/flink/blob/0ac1549e/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java index d704fbd..d6ee968 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java @@ -61,6 +61,16 @@ public class Kafka08ITCase extends KafkaConsumerTestBase { } @Test(timeout = 60000) + public void testPunctuatedExplicitWMConsumer() throws Exception { + runExplicitPunctuatedWMgeneratingConsumerTest(false); + } + + @Test(timeout = 60000) + public void testPunctuatedExplicitWMConsumerWithEmptyTopic() throws Exception { + runExplicitPunctuatedWMgeneratingConsumerTest(true); + } + + @Test(timeout = 60000) public void testKeyValueSupport() throws Exception { runKeyValueTest(); } @@ -198,9 +208,9 @@ public class Kafka08ITCase extends KafkaConsumerTestBase { LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3); - assertTrue(o1 == FlinkKafkaConsumer08.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100)); - assertTrue(o2 == FlinkKafkaConsumer08.OFFSET_NOT_SET || (o2 >= 0 && o2 <= 100)); - assertTrue(o3 == FlinkKafkaConsumer08.OFFSET_NOT_SET || (o3 >= 0 && o3 <= 100)); + assertTrue(o1 == FlinkKafkaConsumerBase.OFFSET_NOT_SET || (o1 >= 0 && o1 <= 100)); + assertTrue(o2 == FlinkKafkaConsumerBase.OFFSET_NOT_SET || (o2 >= 0 && o2 <= 100)); + assertTrue(o3 == FlinkKafkaConsumerBase.OFFSET_NOT_SET || (o3 >= 0 && o3 <= 100)); LOG.info("Manipulating offsets"); http://git-wip-us.apache.org/repos/asf/flink/blob/0ac1549e/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java index 113ad71..7337f65 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTest.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.commons.collections.map.LinkedMap; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaPartitionState; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -81,7 +82,7 @@ public class KafkaConsumerTest { @Test public void testSnapshot() { try { - Field offsetsField = FlinkKafkaConsumerBase.class.getDeclaredField("offsetsState"); + Field offsetsField = FlinkKafkaConsumerBase.class.getDeclaredField("partitionState"); Field runningField = FlinkKafkaConsumerBase.class.getDeclaredField("running"); Field mapField = FlinkKafkaConsumerBase.class.getDeclaredField("pendingCheckpoints"); @@ -92,18 +93,19 @@ public class KafkaConsumerTest { FlinkKafkaConsumer08<?> consumer = mock(FlinkKafkaConsumer08.class); when(consumer.snapshotState(anyLong(), anyLong())).thenCallRealMethod(); - + HashMap<KafkaTopicPartition, KafkaPartitionState> testState = new HashMap<>(); HashMap<KafkaTopicPartition, Long> testOffsets = new HashMap<>(); long[] offsets = new long[] { 43, 6146, 133, 16, 162, 616 }; int j = 0; for (long i: offsets) { KafkaTopicPartition ktp = new KafkaTopicPartition("topic", j++); + testState.put(ktp, new KafkaPartitionState(ktp.getPartition(), i)); testOffsets.put(ktp, i); } LinkedMap map = new LinkedMap(); - offsetsField.set(consumer, testOffsets); + offsetsField.set(consumer, testState); runningField.set(consumer, true); mapField.set(consumer, map); @@ -118,7 +120,9 @@ public class KafkaConsumerTest { HashMap<KafkaTopicPartition, Long> checkpointCopy = (HashMap<KafkaTopicPartition, Long>) checkpoint.clone(); for (Map.Entry<KafkaTopicPartition, Long> e: testOffsets.entrySet()) { - testOffsets.put(e.getKey(), e.getValue() + 1); + KafkaTopicPartition ktp = e.getKey(); + testState.put(ktp, new KafkaPartitionState(ktp.getPartition(), e.getValue() + 1)); + testOffsets.put(ktp, e.getValue() + 1); } assertEquals(checkpointCopy, checkpoint); http://git-wip-us.apache.org/repos/asf/flink/blob/0ac1549e/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java index 3b780bd..55f9875 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java @@ -269,7 +269,7 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> { this.subscribedPartitionsAsFlink = assignPartitions(this.partitionInfos, numConsumers, thisConsumerIndex); if(this.subscribedPartitionsAsFlink.isEmpty()) { LOG.info("This consumer doesn't have any partitions assigned"); - this.offsetsState = null; + this.partitionState = null; return; } else { StreamingRuntimeContext streamingRuntimeContext = (StreamingRuntimeContext) getRuntimeContext(); @@ -302,13 +302,13 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> { // check if we need to explicitly seek to a specific offset (restore case) if(restoreToOffset != null) { // we are in a recovery scenario - for(Map.Entry<KafkaTopicPartition, Long> offset: restoreToOffset.entrySet()) { + for(Map.Entry<KafkaTopicPartition, Long> info: restoreToOffset.entrySet()) { // seek all offsets to the right position - this.consumer.seek(new TopicPartition(offset.getKey().getTopic(), offset.getKey().getPartition()), offset.getValue() + 1); + this.consumer.seek(new TopicPartition(info.getKey().getTopic(), info.getKey().getPartition()), info.getValue() + 1); } - this.offsetsState = restoreToOffset; + this.partitionState = restoreInfoFromCheckpoint(); } else { - this.offsetsState = new HashMap<>(); + this.partitionState = new HashMap<>(); } } @@ -474,8 +474,7 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> { break pollLoop; } synchronized (sourceContext.getCheckpointLock()) { - sourceContext.collect(value); - flinkKafkaConsumer.offsetsState.put(flinkPartition, record.offset()); + flinkKafkaConsumer.processElement(sourceContext, flinkPartition, value, record.offset()); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/0ac1549e/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java index eb152a2..82e1dce 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java @@ -36,13 +36,22 @@ public class Kafka09ITCase extends KafkaConsumerTestBase { runFailOnNoBrokerTest(); } - @Test(timeout = 60000) public void testConcurrentProducerConsumerTopology() throws Exception { runSimpleConcurrentProducerConsumerTopology(); } @Test(timeout = 60000) + public void testPunctuatedExplicitWMConsumer() throws Exception { + runExplicitPunctuatedWMgeneratingConsumerTest(false); + } + + @Test(timeout = 60000) + public void testPunctuatedExplicitWMConsumerWithEmptyTopic() throws Exception { + runExplicitPunctuatedWMgeneratingConsumerTest(true); + } + + @Test(timeout = 60000) public void testKeyValueSupport() throws Exception { runKeyValueTest(); } http://git-wip-us.apache.org/repos/asf/flink/blob/0ac1549e/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java index 5f20f16..d9e813f 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java @@ -18,12 +18,22 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.commons.collections.map.LinkedMap; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; +import org.apache.flink.streaming.api.functions.TimestampAssigner; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaPartitionState; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,7 +48,7 @@ import static java.util.Objects.requireNonNull; import static org.apache.flink.streaming.connectors.kafka.util.KafkaUtils.checkArgument; public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> - implements CheckpointListener, CheckpointedAsynchronously<HashMap<KafkaTopicPartition, Long>>, ResultTypeQueryable<T> { + implements CheckpointListener, CheckpointedAsynchronously<HashMap<KafkaTopicPartition, Long>>, ResultTypeQueryable<T>, Triggerable { // ------------------------------------------------------------------------ @@ -46,6 +56,10 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti private static final long serialVersionUID = -6272159445203409112L; + /** Magic number to define an unset offset. Negative offsets are not used by Kafka (invalid), + * and we pick a number that is probably (hopefully) not used by Kafka as a magic number for anything else. */ + public static final long OFFSET_NOT_SET = -915623761776L; + /** The maximum number of pending non-committed checkpoints to track, to avoid memory leaks */ public static final int MAX_NUM_PENDING_CHECKPOINTS = 100; @@ -58,8 +72,13 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti /** Data for pending but uncommitted checkpoints */ protected final LinkedMap pendingCheckpoints = new LinkedMap(); - /** The offsets of the last returned elements */ - protected transient HashMap<KafkaTopicPartition, Long> offsetsState; + /** + * Information about the partitions being read by the local consumer. This contains: + * offsets of the last returned elements, and if a timestamp assigner is used, it + * also contains the maximum seen timestamp in the partition and if the partition + * still receives elements or it is inactive. + */ + protected transient HashMap<KafkaTopicPartition, KafkaPartitionState> partitionState; /** The offsets to restore to, if the consumer restores state from a checkpoint */ protected transient HashMap<KafkaTopicPartition, Long> restoreToOffset; @@ -68,13 +87,41 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti protected volatile boolean running = true; // ------------------------------------------------------------------------ + // WATERMARK EMISSION + // ------------------------------------------------------------------------ + + /** + * The user-specified methods to extract the timestamps from the records in Kafka, and + * to decide when to emit watermarks. + */ + private AssignerWithPunctuatedWatermarks<T> punctuatedWatermarkAssigner; + + /** + * The user-specified methods to extract the timestamps from the records in Kafka, and + * to decide when to emit watermarks. + */ + private AssignerWithPeriodicWatermarks<T> periodicWatermarkAssigner; + + private StreamingRuntimeContext runtime = null; + + private SourceContext<T> srcContext = null; + + /** + * The interval between consecutive periodic watermark emissions, + * as configured via the {@link ExecutionConfig#getAutoWatermarkInterval()}. + */ + private long watermarkInterval = -1; + /** The last emitted watermark. */ + private long lastEmittedWatermark = Long.MIN_VALUE; + + // ------------------------------------------------------------------------ /** * Creates a new Flink Kafka Consumer, using the given type of fetcher and offset handler. * * <p>To determine which kink of fetcher and offset handler to use, please refer to the docs - * at the beginnign of this class.</p> + * at the beginning of this class.</p> * * @param deserializer * The deserializer to turn raw byte messages into Java/Scala objects. @@ -85,13 +132,300 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti this.deserializer = requireNonNull(deserializer, "valueDeserializer"); } + /** + * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit watermarks in a punctuated manner. Bare in mind + * that the source can either have an {@link AssignerWithPunctuatedWatermarks} or an + * {@link AssignerWithPeriodicWatermarks}, not both. + */ + public FlinkKafkaConsumerBase<T> setPunctuatedWatermarkEmitter(AssignerWithPunctuatedWatermarks<T> assigner) { + checkEmitterDuringInit(); + this.punctuatedWatermarkAssigner = assigner; + return this; + } + + /** + * Specifies an {@link AssignerWithPeriodicWatermarks} to emit watermarks periodically. Bare in mind that the + * source can either have an {@link AssignerWithPunctuatedWatermarks} or an + * {@link AssignerWithPeriodicWatermarks}, not both. + */ + public FlinkKafkaConsumerBase<T> setPeriodicWatermarkEmitter(AssignerWithPeriodicWatermarks<T> assigner) { + checkEmitterDuringInit(); + this.periodicWatermarkAssigner = assigner; + return this; + } + + /** + * Processes the element after having been read from Kafka and deserialized, and updates the + * last read offset for the specifies partition. These two actions should be performed in + * an atomic way in order to guarantee exactly once semantics. + * @param sourceContext + * The context the task operates in. + * @param partDescriptor + * A descriptor containing the topic and the id of the partition. + * @param value + * The element to process. + * @param offset + * The offset of the element in the partition. + * */ + public void processElement(SourceContext<T> sourceContext, KafkaTopicPartition partDescriptor, T value, long offset) { + if (punctuatedWatermarkAssigner == null && periodicWatermarkAssigner == null) { + // the case where no watermark emitter is specified. + sourceContext.collect(value); + } else { + + if (srcContext == null) { + srcContext = sourceContext; + } + + long extractedTimestamp = extractTimestampAndEmitElement(partDescriptor, value); + + // depending on the specified watermark emitter, either send a punctuated watermark, + // or set the timer for the first periodic watermark. In the periodic case, we set the timer + // only for the first watermark, as it is the trigger() that will set the subsequent ones. + + if (punctuatedWatermarkAssigner != null) { + final Watermark nextWatermark = punctuatedWatermarkAssigner + .checkAndGetNextWatermark(value, extractedTimestamp); + if (nextWatermark != null) { + emitWatermarkIfMarkingProgress(sourceContext); + } + } else if(periodicWatermarkAssigner != null && runtime == null) { + runtime = (StreamingRuntimeContext) getRuntimeContext(); + watermarkInterval = runtime.getExecutionConfig().getAutoWatermarkInterval(); + if (watermarkInterval > 0) { + runtime.registerTimer(System.currentTimeMillis() + watermarkInterval, this); + } + } + } + updateOffsetForPartition(partDescriptor, offset); + } + + /** + * Extract the timestamp from the element based on the user-specified extractor, + * emit the element with the new timestamp, and update the partition monitoring info (if necessary). + * In more detail, upon reception of an element with a timestamp greater than the greatest timestamp + * seen so far in that partition, this method updates the maximum timestamp seen for that partition, + * and marks the partition as {@code active}, i.e. it still receives fresh data. + * @param partDescriptor the partition the new element belongs to. + * @param value the element to be forwarded. + * @return the timestamp of the new element. + */ + private long extractTimestampAndEmitElement(KafkaTopicPartition partDescriptor, T value) { + long extractedTimestamp = getTimestampAssigner().extractTimestamp(value, Long.MIN_VALUE); + srcContext.collectWithTimestamp(value, extractedTimestamp); + updateMaximumTimestampForPartition(partDescriptor, extractedTimestamp); + return extractedTimestamp; + } + + /** + * Upon reception of an element with a timestamp greater than the greatest timestamp seen so far in the partition, + * this method updates the maximum timestamp seen for that partition to {@code timestamp}, and marks the partition + * as {@code active}, i.e. it still receives fresh data. If the partition is not known to the system, then a new + * {@link KafkaPartitionState} is created and is associated to the new partition for future monitoring. + * @param partDescriptor + * A descriptor containing the topic and the id of the partition. + * @param timestamp + * The timestamp to set the minimum to, if smaller than the already existing one. + * @return {@code true} if the minimum was updated successfully to {@code timestamp}, {@code false} + * if the previous value is smaller than the provided timestamp + * */ + private boolean updateMaximumTimestampForPartition(KafkaTopicPartition partDescriptor, long timestamp) { + KafkaPartitionState info = getOrInitializeInfo(partDescriptor); + + if(timestamp > info.getMaxTimestamp()) { + + // the flag is set to false as soon as the current partition's max timestamp is sent as a watermark. + // if then, and for that partition, only late elements arrive, then the max timestamp will stay the + // same, and it will keep the overall system from progressing. + // To avoid this, we only mark a partition as active on non-late elements. + + info.setActive(true); + info.setMaxTimestamp(timestamp); + return true; + } + return false; + } + + /** + * Updates the last read offset for the partition specified by the {@code partDescriptor} to {@code offset}. + * If it is the first time we see the partition, then a new {@link KafkaPartitionState} is created to monitor + * this specific partition. + * @param partDescriptor the partition whose info to update. + * @param offset the last read offset of the partition. + */ + public void updateOffsetForPartition(KafkaTopicPartition partDescriptor, long offset) { + KafkaPartitionState info = getOrInitializeInfo(partDescriptor); + info.setOffset(offset); + } + + @Override + public void trigger(long timestamp) throws Exception { + if(this.srcContext == null) { + // if the trigger is called before any elements, then we + // just set the next timer to fire when it should and we + // ignore the triggering as this would produce no results. + setNextWatermarkTimer(); + return; + } + + // this is valid because this method is only called when watermarks + // are set to be emitted periodically. + final Watermark nextWatermark = periodicWatermarkAssigner.getCurrentWatermark(); + if(nextWatermark != null) { + emitWatermarkIfMarkingProgress(srcContext); + } + setNextWatermarkTimer(); + } + + /** + * Emits a new watermark, with timestamp equal to the minimum across all the maximum timestamps + * seen per local partition (across all topics). The new watermark is emitted if and only if + * it signals progress in event-time, i.e. if its timestamp is greater than the timestamp of + * the last emitted watermark. In addition, this method marks as inactive the partition whose + * timestamp was emitted as watermark, i.e. the one with the minimum across the maximum timestamps + * of the local partitions. This is done to avoid not making progress because + * a partition stopped receiving data. The partition is going to be marked as {@code active} + * as soon as the <i>next non-late</i> element arrives. + * + * @return {@code true} if the Watermark was successfully emitted, {@code false} otherwise. + */ + private boolean emitWatermarkIfMarkingProgress(SourceFunction.SourceContext<T> sourceContext) { + Tuple2<KafkaTopicPartition, Long> globalMinTs = getMinTimestampAcrossAllTopics(); + if(globalMinTs.f0 != null ) { + synchronized (sourceContext.getCheckpointLock()) { + long minTs = globalMinTs.f1; + if(minTs > lastEmittedWatermark) { + lastEmittedWatermark = minTs; + Watermark toEmit = new Watermark(minTs); + sourceContext.emitWatermark(toEmit); + return true; + } + } + } + return false; + } + + /** + * Kafka sources with timestamp extractors are expected to keep the maximum timestamp seen per + * partition they are reading from. This is to mark the per-partition event-time progress. + * + * This method iterates this list, and returns the minimum timestamp across these per-partition + * max timestamps, and across all topics. In addition to this information, it also returns the topic and + * the partition within the topic the timestamp belongs to. + */ + private Tuple2<KafkaTopicPartition, Long> getMinTimestampAcrossAllTopics() { + Tuple2<KafkaTopicPartition, Long> minTimestamp = new Tuple2<>(null, Long.MAX_VALUE); + for(Map.Entry<KafkaTopicPartition, KafkaPartitionState> entries: partitionState.entrySet()) { + KafkaTopicPartition part = entries.getKey(); + KafkaPartitionState info = entries.getValue(); + + if(partitionIsActive(part) && info.getMaxTimestamp() < minTimestamp.f1) { + minTimestamp.f0 = part; + minTimestamp.f1 = info.getMaxTimestamp(); + } + } + + if(minTimestamp.f0 != null) { + // it means that we have a winner and we have to set its flag to + // inactive, until its next non-late element. + KafkaTopicPartition partitionDescriptor = minTimestamp.f0; + setActiveFlagForPartition(partitionDescriptor, false); + } + + return minTimestamp; + } + + /** + * Sets the {@code active} flag for a given partition of a topic to {@code isActive}. + * This flag signals if the partition is still receiving data and it is used to avoid the case + * where a partition stops receiving data, so its max seen timestamp does not advance, and it + * holds back the progress of the watermark for all partitions. Note that if the partition is + * not known to the system, then a new {@link KafkaPartitionState} is created and is associated + * to the new partition for future monitoring. + * + * @param partDescriptor + * A descriptor containing the topic and the id of the partition. + * @param isActive + * The value {@code true} or {@code false} to set the flag to. + */ + private void setActiveFlagForPartition(KafkaTopicPartition partDescriptor, boolean isActive) { + KafkaPartitionState info = getOrInitializeInfo(partDescriptor); + info.setActive(isActive); + } + + /** + * Gets the statistics for a given partition specified by the {@code partition} argument. + * If it is the first time we see this partition, a new {@link KafkaPartitionState} data structure + * is initialized to monitor it from now on. This method never throws a {@link NullPointerException}. + * @param partition the partition to be fetched. + * @return the gathered statistics for that partition. + * */ + private KafkaPartitionState getOrInitializeInfo(KafkaTopicPartition partition) { + KafkaPartitionState info = partitionState.get(partition); + if(info == null) { + info = new KafkaPartitionState(partition.getPartition(), FlinkKafkaConsumerBase.OFFSET_NOT_SET); + partitionState.put(partition, info); + } + return info; + } + + /** + * Checks if a partition of a topic is still active, i.e. if it still receives data. + * @param partDescriptor + * A descriptor containing the topic and the id of the partition. + * */ + private boolean partitionIsActive(KafkaTopicPartition partDescriptor) { + KafkaPartitionState info = partitionState.get(partDescriptor); + if(info == null) { + throw new RuntimeException("Unknown Partition: Topic=" + partDescriptor.getTopic() + + " Partition=" + partDescriptor.getPartition()); + } + return info.isActive(); + } + + private TimestampAssigner<T> getTimestampAssigner() { + checkEmitterStateAfterInit(); + return periodicWatermarkAssigner != null ? periodicWatermarkAssigner : punctuatedWatermarkAssigner; + } + + private void setNextWatermarkTimer() { + long timeToNextWatermark = System.currentTimeMillis() + watermarkInterval; + runtime.registerTimer(timeToNextWatermark, this); + } + + private void checkEmitterDuringInit() { + if(periodicWatermarkAssigner != null) { + throw new RuntimeException("A periodic watermark emitter has already been provided."); + } else if(punctuatedWatermarkAssigner != null) { + throw new RuntimeException("A punctuated watermark emitter has already been provided."); + } + } + + private void checkEmitterStateAfterInit() { + if(periodicWatermarkAssigner == null && punctuatedWatermarkAssigner == null) { + throw new RuntimeException("The timestamp assigner has not been initialized."); + } else if(periodicWatermarkAssigner != null && punctuatedWatermarkAssigner != null) { + throw new RuntimeException("The source can either have an assigner with punctuated " + + "watermarks or one with periodic watermarks, not both."); + } + } + // ------------------------------------------------------------------------ // Checkpoint and restore // ------------------------------------------------------------------------ + HashMap<KafkaTopicPartition, KafkaPartitionState> restoreInfoFromCheckpoint() { + HashMap<KafkaTopicPartition, KafkaPartitionState> partInfo = new HashMap<>(restoreToOffset.size()); + for(Map.Entry<KafkaTopicPartition, Long> offsets: restoreToOffset.entrySet()) { + KafkaTopicPartition key = offsets.getKey(); + partInfo.put(key, new KafkaPartitionState(key.getPartition(), offsets.getValue())); + } + return partInfo; + } + @Override public HashMap<KafkaTopicPartition, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - if (offsetsState == null) { + if (partitionState == null) { LOG.debug("snapshotState() requested on not yet opened source; returning null."); return null; } @@ -100,15 +434,16 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti return null; } + HashMap<KafkaTopicPartition, Long> currentOffsets = new HashMap<>(); + for (Map.Entry<KafkaTopicPartition, KafkaPartitionState> entry: partitionState.entrySet()) { + currentOffsets.put(entry.getKey(), entry.getValue().getOffset()); + } + if (LOG.isDebugEnabled()) { LOG.debug("Snapshotting state. Offsets: {}, checkpoint id: {}, timestamp: {}", - KafkaTopicPartition.toString(offsetsState), checkpointId, checkpointTimestamp); + KafkaTopicPartition.toString(currentOffsets), checkpointId, checkpointTimestamp); } - // the use of clone() is okay here is okay, we just need a new map, the keys are not changed - //noinspection unchecked - HashMap<KafkaTopicPartition, Long> currentOffsets = (HashMap<KafkaTopicPartition, Long>) offsetsState.clone(); - // the map cannot be asynchronously updated, because only one checkpoint call can happen // on this function at a time: either snapshotState() or notifyCheckpointComplete() pendingCheckpoints.put(checkpointId, currentOffsets); @@ -128,7 +463,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { - if (offsetsState == null) { + if (partitionState == null) { LOG.debug("notifyCheckpointComplete() called on uninitialized source"); return; } http://git-wip-us.apache.org/repos/asf/flink/blob/0ac1549e/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaPartitionState.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaPartitionState.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaPartitionState.java new file mode 100644 index 0000000..11a392a --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaPartitionState.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import java.io.Serializable; + +public class KafkaPartitionState implements Serializable { + + private static final long serialVersionUID = 722083576322742328L; + + private final int partitionID; + private long offset; + + private long maxTimestamp = Long.MIN_VALUE; + private boolean isActive = false; + + public KafkaPartitionState(int id, long offset) { + this.partitionID = id; + this.offset = offset; + } + + public void setOffset(long offset) { + this.offset = offset; + } + + public void setActive(boolean isActive) { + this.isActive = isActive; + } + + public void setMaxTimestamp(long timestamp) { + maxTimestamp = timestamp; + } + + public int getPartition() { + return partitionID; + } + + public boolean isActive() { + return isActive; + } + + public long getMaxTimestamp() { + return maxTimestamp; + } + + public long getOffset() { + return offset; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/0ac1549e/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index 2cd59e6..340950b 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -32,6 +32,7 @@ import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; @@ -50,15 +51,21 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators; import org.apache.flink.streaming.connectors.kafka.testutils.DiscardingSink; @@ -69,6 +76,7 @@ import org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidating import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper; import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2Partitioner; import org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; @@ -374,7 +382,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { deleteTestTopic(topic); } - /** * Tests the proper consumption when having a 1:1 correspondence between kafka partitions and * Flink sources. @@ -1443,4 +1450,229 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { this.numElementsTotal = state; } } + + ///////////// Testing the Kafka consumer with embeded watermark generation functionality /////////////// + + @RetryOnException(times=0, exception=kafka.common.NotLeaderForPartitionException.class) + public void runExplicitPunctuatedWMgeneratingConsumerTest(boolean emptyPartition) throws Exception { + + final String topic1 = "wmExtractorTopic1_" + UUID.randomUUID().toString(); + final String topic2 = "wmExtractorTopic2_" + UUID.randomUUID().toString(); + + final Map<String, Boolean> topics = new HashMap<>(); + topics.put(topic1, false); + topics.put(topic2, emptyPartition); + + final int noOfTopcis = topics.size(); + final int partitionsPerTopic = 1; + final int elementsPerPartition = 100 + 1; + + final int totalElements = emptyPartition ? + partitionsPerTopic * elementsPerPartition : + noOfTopcis * partitionsPerTopic * elementsPerPartition; + + createTestTopic(topic1, partitionsPerTopic, 1); + createTestTopic(topic2, partitionsPerTopic, 1); + + final StreamExecutionEnvironment env = + StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + env.setParallelism(partitionsPerTopic); + env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately + env.getConfig().disableSysoutLogging(); + + TypeInformation<Tuple2<Long, Integer>> longIntType = TypeInfoParser.parse("Tuple2<Long, Integer>"); + + Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings); + producerProperties.setProperty("retries", "0"); + + putDataInTopics(env, producerProperties, elementsPerPartition, topics, longIntType); + + List<String> topicTitles = new ArrayList<>(topics.keySet()); + runPunctuatedComsumer(env, topicTitles, totalElements, longIntType); + + executeAndCatchException(env, "runComsumerWithPunctuatedExplicitWMTest"); + + for(String topic: topicTitles) { + deleteTestTopic(topic); + } + } + + private void executeAndCatchException(StreamExecutionEnvironment env, String execName) throws Exception { + try { + tryExecutePropagateExceptions(env, execName); + } + catch (ProgramInvocationException | JobExecutionException e) { + // look for NotLeaderForPartitionException + Throwable cause = e.getCause(); + + // search for nested SuccessExceptions + int depth = 0; + while (cause != null && depth++ < 20) { + if (cause instanceof kafka.common.NotLeaderForPartitionException) { + throw (Exception) cause; + } + cause = cause.getCause(); + } + throw e; + } + } + + private void putDataInTopics(StreamExecutionEnvironment env, + Properties producerProperties, + final int elementsPerPartition, + Map<String, Boolean> topics, + TypeInformation<Tuple2<Long, Integer>> outputTypeInfo) { + if(topics.size() != 2) { + throw new RuntimeException("This method accepts two topics as arguments."); + } + + TypeInformationSerializationSchema<Tuple2<Long, Integer>> sinkSchema = + new TypeInformationSerializationSchema<>(outputTypeInfo, env.getConfig()); + + DataStream<Tuple2<Long, Integer>> stream = env + .addSource(new RichParallelSourceFunction<Tuple2<Long, Integer>>() { + private boolean running = true; + + @Override + public void run(SourceContext<Tuple2<Long, Integer>> ctx) throws InterruptedException { + int topic = 0; + int currentTs = 1; + + while (running && currentTs < elementsPerPartition) { + long timestamp = (currentTs % 10 == 0) ? -1L : currentTs; + ctx.collect(new Tuple2<Long, Integer>(timestamp, topic)); + currentTs++; + } + + Tuple2<Long, Integer> toWrite2 = new Tuple2<Long, Integer>(-1L, topic); + ctx.collect(toWrite2); + } + + @Override + public void cancel() { + running = false; + } + }).setParallelism(1); + + List<Map.Entry<String, Boolean>> topicsL = new ArrayList<>(topics.entrySet()); + stream.map(new MapFunction<Tuple2<Long,Integer>, Tuple2<Long,Integer>>() { + + @Override + public Tuple2<Long, Integer> map(Tuple2<Long, Integer> value) throws Exception { + return value; + } + }).setParallelism(1).addSink(kafkaServer.getProducer(topicsL.get(0).getKey(), + new KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, null)).setParallelism(1); + + if(!topicsL.get(1).getValue()) { + stream.map(new MapFunction<Tuple2<Long,Integer>, Tuple2<Long,Integer>>() { + + @Override + public Tuple2<Long, Integer> map(Tuple2<Long, Integer> value) throws Exception { + long timestamp = (value.f0 == -1) ? -1L : 1000 + value.f0; + return new Tuple2<Long, Integer>(timestamp, 1); + } + }).setParallelism(1).addSink(kafkaServer.getProducer(topicsL.get(1).getKey(), + new KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, null)).setParallelism(1); + } + } + + private DataStreamSink<Tuple2<Long, Integer>> runPunctuatedComsumer(StreamExecutionEnvironment env, + List<String> topics, + final int totalElementsToExpect, + TypeInformation<Tuple2<Long, Integer>> inputTypeInfo) { + + TypeInformationSerializationSchema<Tuple2<Long, Integer>> sourceSchema = + new TypeInformationSerializationSchema<>(inputTypeInfo, env.getConfig()); + + FlinkKafkaConsumerBase<Tuple2<Long, Integer>> source = kafkaServer + .getConsumer(topics, sourceSchema, standardProps) + .setPunctuatedWatermarkEmitter(new TestPunctuatedTSExtractor()); + + DataStreamSource<Tuple2<Long, Integer>> consuming = env.setParallelism(1).addSource(source); + + return consuming + .transform("testingWatermarkOperator", inputTypeInfo, new WMTestingOperator()) + .addSink(new RichSinkFunction<Tuple2<Long, Integer>>() { + + private int elementCount = 0; + + @Override + public void invoke(Tuple2<Long, Integer> value) throws Exception { + elementCount++; + if (elementCount == totalElementsToExpect) { + throw new SuccessException(); + } + } + + @Override + public void close() throws Exception { + super.close(); + } + }); + } + + /** An extractor that emits a Watermark whenever the timestamp <b>in the record</b> is equal to {@code -1}. */ + private static class TestPunctuatedTSExtractor implements AssignerWithPunctuatedWatermarks<Tuple2<Long, Integer>> { + + @Override + public Watermark checkAndGetNextWatermark(Tuple2<Long, Integer> lastElement, long extractedTimestamp) { + return (lastElement.f0 == -1) ? new Watermark(extractedTimestamp) : null; + } + + @Override + public long extractTimestamp(Tuple2<Long, Integer> element, long previousElementTimestamp) { + return element.f0; + } + } + + private static class WMTestingOperator extends AbstractStreamOperator<Tuple2<Long, Integer>> implements OneInputStreamOperator<Tuple2<Long, Integer>, Tuple2<Long, Integer>> { + + private long lastReceivedWatermark = Long.MIN_VALUE; + + private Map<Integer, Boolean> isEligible = new HashMap<>(); + private Map<Integer, Long> perPartitionMaxTs = new HashMap<>(); + + WMTestingOperator() { + isEligible = new HashMap<>(); + perPartitionMaxTs = new HashMap<>(); + } + + @Override + public void processElement(StreamRecord<Tuple2<Long, Integer>> element) throws Exception { + int partition = element.getValue().f1; + Long maxTs = perPartitionMaxTs.get(partition); + if(maxTs == null || maxTs < element.getValue().f0) { + perPartitionMaxTs.put(partition, element.getValue().f0); + isEligible.put(partition, element.getValue().f0 > lastReceivedWatermark); + } + output.collect(element); + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + int partition = -1; + long minTS = Long.MAX_VALUE; + for (Integer part : perPartitionMaxTs.keySet()) { + Long ts = perPartitionMaxTs.get(part); + if (ts < minTS && isEligible.get(part)) { + partition = part; + minTS = ts; + lastReceivedWatermark = ts; + } + } + isEligible.put(partition, false); + + assertEquals(minTS, mark.getTimestamp()); + output.emitWatermark(mark); + } + + @Override + public void close() throws Exception { + super.close(); + perPartitionMaxTs.clear(); + isEligible.clear(); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/0ac1549e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java index 38ee394..4b17300 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java @@ -29,7 +29,7 @@ import org.apache.flink.streaming.api.watermark.Watermark; * * <p>Use this class to generate watermarks in a periodical interval. * At most every {@code i} milliseconds (configured via - * {@link ExecutionConfig#getAutoWatermarkInterval()}, the system will call the + * {@link ExecutionConfig#getAutoWatermarkInterval()}), the system will call the * {@link #getCurrentWatermark()} method to probe for the next watermark value. * The system will generate a new watermark, if the probed value is non-null * and has a timestamp larger than that of the previous watermark (to preserve