[FLINK-3123] [kafka] Allow custom specific start offsets for FlinkKafkaConsumer
This closes #2687. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5f08e535 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5f08e535 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5f08e535 Branch: refs/heads/master Commit: 5f08e53592ebd29cfcd8ee486fcfd6229b82aa69 Parents: f214317 Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org> Authored: Fri Mar 10 21:11:42 2017 +0800 Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org> Committed: Mon Mar 13 23:38:13 2017 +0800 ---------------------------------------------------------------------- docs/dev/connectors/kafka.md | 35 ++++++- .../connectors/kafka/Kafka010ITCase.java | 4 + .../connectors/kafka/Kafka08ITCase.java | 9 +- .../connectors/kafka/Kafka09ITCase.java | 4 + .../kafka/FlinkKafkaConsumerBase.java | 101 ++++++++++++++++++- .../connectors/kafka/config/StartupMode.java | 9 +- .../KafkaConsumerPartitionAssignmentTest.java | 33 ++++-- .../connectors/kafka/KafkaConsumerTestBase.java | 82 +++++++++++++-- 8 files changed, 251 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/docs/dev/connectors/kafka.md ---------------------------------------------------------------------- diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md index 06e40b2..6d58b0c 100644 --- a/docs/dev/connectors/kafka.md +++ b/docs/dev/connectors/kafka.md @@ -220,7 +220,40 @@ All versions of the Flink Kafka Consumer have the above explicit configuration m record. Under these modes, committed offsets in Kafka will be ignored and not used as starting positions. -Note that these settings do not affect the start position when the job is +You can also specify the exact offsets the consumer should start from for each partition: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>(); +specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L); +specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L); +specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L); + +myConsumer.setStartFromSpecificOffsets(specificStartOffsets); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]() +specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L) +specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L) +specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L) + +myConsumer.setStartFromSpecificOffsets(specificStartOffsets) +{% endhighlight %} +</div> +</div> + +The above example configures the consumer to start from the specified offsets for +partitions 0, 1, and 2 of topic `myTopic`. The offset values should be the +next record that the consumer should read for each partition. Note that +if the consumer needs to read a partition which does not have a specified +offset within the provided offsets map, it will fallback to the default +group offsets behaviour (i.e. `setStartFromGroupOffsets()`) for that +particular partition. + +Note that these start position configuration methods do not affect the start position when the job is automatically restored from a failure or manually restored using a savepoint. On restore, the start position of each Kafka partition is determined by the offsets stored in the savepoint or checkpoint http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java index a375fb6..2085169 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java @@ -147,6 +147,10 @@ public class Kafka010ITCase extends KafkaConsumerTestBase { runStartFromGroupOffsets(); } + @Test(timeout = 60000) + public void testStartFromSpecificOffsets() throws Exception { + runStartFromSpecificOffsets(); + } // --- offset committing --- http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java index 3fc00e9..2e7c368 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java @@ -90,7 +90,7 @@ public class Kafka08ITCase extends KafkaConsumerTestBase { final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); env.getConfig().disableSysoutLogging(); - readSequence(env, StartupMode.GROUP_OFFSETS, standardProps, parallelism, topic, valuesCount, startFrom); + readSequence(env, StartupMode.GROUP_OFFSETS, null, standardProps, parallelism, topic, valuesCount, startFrom); deleteTestTopic(topic); } @@ -136,6 +136,11 @@ public class Kafka08ITCase extends KafkaConsumerTestBase { runStartFromGroupOffsets(); } + @Test(timeout = 60000) + public void testStartFromSpecificOffsets() throws Exception { + runStartFromSpecificOffsets(); + } + // --- offset committing --- @Test(timeout = 60000) @@ -200,7 +205,7 @@ public class Kafka08ITCase extends KafkaConsumerTestBase { readProps.setProperty("auto.commit.interval.ms", "500"); // read so that the offset can be committed to ZK - readSequence(env, StartupMode.GROUP_OFFSETS, readProps, parallelism, topicName, 100, 0); + readSequence(env, StartupMode.GROUP_OFFSETS, null, readProps, parallelism, topicName, 100, 0); // get the offset CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient(); http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java index 6added7..ca9965c 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java @@ -127,6 +127,10 @@ public class Kafka09ITCase extends KafkaConsumerTestBase { runStartFromGroupOffsets(); } + @Test(timeout = 60000) + public void testStartFromSpecificOffsets() throws Exception { + runStartFromSpecificOffsets(); + } // --- offset committing --- http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java index 144ede8..027751c 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java @@ -40,11 +40,13 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.util.SerializedValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -104,6 +106,9 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti /** The startup mode for the consumer (default is {@link StartupMode#GROUP_OFFSETS}) */ protected StartupMode startupMode = StartupMode.GROUP_OFFSETS; + /** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS} */ + protected Map<KafkaTopicPartition, Long> specificStartupOffsets; + // ------------------------------------------------------------------------ // runtime state (used individually by each parallel subtask) // ------------------------------------------------------------------------ @@ -210,23 +215,33 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti /** * Specifies the consumer to start reading from the earliest offset for all partitions. - * This ignores any committed group offsets in Zookeeper / Kafka brokers. + * This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers. + * + * This method does not effect where partitions are read from when the consumer is restored + * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or + * savepoint, only the offsets in the restored state will be used. * * @return The consumer object, to allow function chaining. */ public FlinkKafkaConsumerBase<T> setStartFromEarliest() { this.startupMode = StartupMode.EARLIEST; + this.specificStartupOffsets = null; return this; } /** * Specifies the consumer to start reading from the latest offset for all partitions. - * This ignores any committed group offsets in Zookeeper / Kafka brokers. + * This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers. + * + * This method does not effect where partitions are read from when the consumer is restored + * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or + * savepoint, only the offsets in the restored state will be used. * * @return The consumer object, to allow function chaining. */ public FlinkKafkaConsumerBase<T> setStartFromLatest() { this.startupMode = StartupMode.LATEST; + this.specificStartupOffsets = null; return this; } @@ -236,10 +251,41 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti * properties. If no offset can be found for a partition, the behaviour in "auto.offset.reset" * set in the configuration properties will be used for the partition. * + * This method does not effect where partitions are read from when the consumer is restored + * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or + * savepoint, only the offsets in the restored state will be used. + * * @return The consumer object, to allow function chaining. */ public FlinkKafkaConsumerBase<T> setStartFromGroupOffsets() { this.startupMode = StartupMode.GROUP_OFFSETS; + this.specificStartupOffsets = null; + return this; + } + + /** + * Specifies the consumer to start reading partitions from specific offsets, set independently for each partition. + * The specified offset should be the offset of the next record that will be read from partitions. + * This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers. + * + * If the provided map of offsets contains entries whose {@link KafkaTopicPartition} is not subscribed by the + * consumer, the entry will be ignored. If the consumer subscribes to a partition that does not exist in the provided + * map of offsets, the consumer will fallback to the default group offset behaviour (see + * {@link FlinkKafkaConsumerBase#setStartFromGroupOffsets()}) for that particular partition. + * + * If the specified offset for a partition is invalid, or the behaviour for that partition is defaulted to group + * offsets but still no group offset could be found for it, then the "auto.offset.reset" behaviour set in the + * configuration properties will be used for the partition + * + * This method does not effect where partitions are read from when the consumer is restored + * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or + * savepoint, only the offsets in the restored state will be used. + * + * @return The consumer object, to allow function chaining. + */ + public FlinkKafkaConsumerBase<T> setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets) { + this.startupMode = StartupMode.SPECIFIC_OFFSETS; + this.specificStartupOffsets = checkNotNull(specificStartupOffsets); return this; } @@ -269,7 +315,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti kafkaTopicPartitions, getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks(), - startupMode); + startupMode, + specificStartupOffsets); if (subscribedPartitionsToStartOffsets.size() != 0) { switch (startupMode) { @@ -285,6 +332,28 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets.keySet()); break; + case SPECIFIC_OFFSETS: + LOG.info("Consumer subtask {} will start reading the following {} partitions from the specified startup offsets {}: {}", + getRuntimeContext().getIndexOfThisSubtask(), + subscribedPartitionsToStartOffsets.size(), + specificStartupOffsets, + subscribedPartitionsToStartOffsets.keySet()); + + List<KafkaTopicPartition> partitionsDefaultedToGroupOffsets = new ArrayList<>(subscribedPartitionsToStartOffsets.size()); + for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) { + if (subscribedPartition.getValue() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) { + partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey()); + } + } + + if (partitionsDefaultedToGroupOffsets.size() > 0) { + LOG.warn("Consumer subtask {} cannot find offsets for the following {} partitions in the specified startup offsets: {}" + + "; their startup offsets will be defaulted to their committed group offsets in Kafka.", + getRuntimeContext().getIndexOfThisSubtask(), + partitionsDefaultedToGroupOffsets.size(), + partitionsDefaultedToGroupOffsets); + } + break; default: case GROUP_OFFSETS: LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}", @@ -550,6 +619,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti * @param indexOfThisSubtask the index of this consumer instance * @param numParallelSubtasks total number of parallel consumer instances * @param startupMode the configured startup mode for the consumer + * @param specificStartupOffsets specific partition offsets to start from + * (only relevant if startupMode is {@link StartupMode#SPECIFIC_OFFSETS}) * * Note: This method is also exposed for testing. */ @@ -558,11 +629,31 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti List<KafkaTopicPartition> kafkaTopicPartitions, int indexOfThisSubtask, int numParallelSubtasks, - StartupMode startupMode) { + StartupMode startupMode, + Map<KafkaTopicPartition, Long> specificStartupOffsets) { for (int i = 0; i < kafkaTopicPartitions.size(); i++) { if (i % numParallelSubtasks == indexOfThisSubtask) { - subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), startupMode.getStateSentinel()); + if (startupMode != StartupMode.SPECIFIC_OFFSETS) { + subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), startupMode.getStateSentinel()); + } else { + if (specificStartupOffsets == null) { + throw new IllegalArgumentException( + "Startup mode for the consumer set to " + StartupMode.SPECIFIC_OFFSETS + + ", but no specific offsets were specified"); + } + + KafkaTopicPartition partition = kafkaTopicPartitions.get(i); + + Long specificOffset = specificStartupOffsets.get(partition); + if (specificOffset != null) { + // since the specified offsets represent the next record to read, we subtract + // it by one so that the initial state of the consumer will be correct + subscribedPartitionsToStartOffsets.put(partition, specificOffset - 1); + } else { + subscribedPartitionsToStartOffsets.put(partition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET); + } + } } } } http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java index f796e62..8fc2fe0 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java @@ -30,7 +30,14 @@ public enum StartupMode { EARLIEST(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET), /** Start from the latest offset */ - LATEST(KafkaTopicPartitionStateSentinel.LATEST_OFFSET); + LATEST(KafkaTopicPartitionStateSentinel.LATEST_OFFSET), + + /** + * Start from user-supplied specific offsets for each partition. + * Since this mode will have specific offsets to start with, we do not need a sentinel value; + * using Long.MIN_VALUE as a placeholder. + */ + SPECIFIC_OFFSETS(Long.MIN_VALUE); /** The sentinel offset value corresponding to this startup mode */ private long stateSentinel; http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java index 379d53a..c24640d 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java @@ -56,7 +56,8 @@ public class KafkaConsumerPartitionAssignmentTest { inPartitions, i, inPartitions.size(), - StartupMode.GROUP_OFFSETS); + StartupMode.GROUP_OFFSETS, + null); List<KafkaTopicPartition> subscribedPartitions = new ArrayList<>(subscribedPartitionsToStartOffsets.keySet()); @@ -95,7 +96,8 @@ public class KafkaConsumerPartitionAssignmentTest { partitions, i, numConsumers, - StartupMode.GROUP_OFFSETS); + StartupMode.GROUP_OFFSETS, + null); List<KafkaTopicPartition> subscribedPartitions = new ArrayList<>(subscribedPartitionsToStartOffsets.keySet()); @@ -138,7 +140,8 @@ public class KafkaConsumerPartitionAssignmentTest { inPartitions, i, numConsumers, - StartupMode.GROUP_OFFSETS); + StartupMode.GROUP_OFFSETS, + null); List<KafkaTopicPartition> subscribedPartitions = new ArrayList<>(subscribedPartitionsToStartOffsets.keySet()); @@ -169,7 +172,8 @@ public class KafkaConsumerPartitionAssignmentTest { ep, 2, 4, - StartupMode.GROUP_OFFSETS); + StartupMode.GROUP_OFFSETS, + null); assertTrue(subscribedPartitionsToStartOffsets.entrySet().isEmpty()); subscribedPartitionsToStartOffsets = new HashMap<>(); @@ -178,7 +182,8 @@ public class KafkaConsumerPartitionAssignmentTest { ep, 0, 1, - StartupMode.GROUP_OFFSETS); + StartupMode.GROUP_OFFSETS, + null); assertTrue(subscribedPartitionsToStartOffsets.entrySet().isEmpty()); } catch (Exception e) { @@ -218,21 +223,24 @@ public class KafkaConsumerPartitionAssignmentTest { initialPartitions, 0, numConsumers, - StartupMode.GROUP_OFFSETS); + StartupMode.GROUP_OFFSETS, + null); FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets( subscribedPartitionsToStartOffsets2, initialPartitions, 1, numConsumers, - StartupMode.GROUP_OFFSETS); + StartupMode.GROUP_OFFSETS, + null); FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets( subscribedPartitionsToStartOffsets3, initialPartitions, 2, numConsumers, - StartupMode.GROUP_OFFSETS); + StartupMode.GROUP_OFFSETS, + null); List<KafkaTopicPartition> subscribedPartitions1 = new ArrayList<>(subscribedPartitionsToStartOffsets1.keySet()); List<KafkaTopicPartition> subscribedPartitions2 = new ArrayList<>(subscribedPartitionsToStartOffsets2.keySet()); @@ -274,21 +282,24 @@ public class KafkaConsumerPartitionAssignmentTest { newPartitions, 0, numConsumers, - StartupMode.GROUP_OFFSETS); + StartupMode.GROUP_OFFSETS, + null); FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets( subscribedPartitionsToStartOffsets2, newPartitions, 1, numConsumers, - StartupMode.GROUP_OFFSETS); + StartupMode.GROUP_OFFSETS, + null); FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets( subscribedPartitionsToStartOffsets3, newPartitions, 2, numConsumers, - StartupMode.GROUP_OFFSETS); + StartupMode.GROUP_OFFSETS, + null); List<KafkaTopicPartition> subscribedPartitions1New = new ArrayList<>(subscribedPartitionsToStartOffsets1.keySet()); List<KafkaTopicPartition> subscribedPartitions2New = new ArrayList<>(subscribedPartitionsToStartOffsets2.keySet()); http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index 580c507..ddac61c 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -62,6 +62,7 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kafka.config.StartupMode; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators; import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper; import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils; @@ -349,7 +350,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { (o3 != null) ? o3.intValue() : 0 )); - readSequence(env2, StartupMode.GROUP_OFFSETS, standardProps, topicName, partitionsToValuesCountAndStartOffset); + readSequence(env2, StartupMode.GROUP_OFFSETS, null, standardProps, topicName, partitionsToValuesCountAndStartOffset); kafkaOffsetHandler.close(); deleteTestTopic(topicName); @@ -465,7 +466,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); - readSequence(env, StartupMode.EARLIEST, readProps, parallelism, topicName, recordsInEachPartition, 0); + readSequence(env, StartupMode.EARLIEST, null, readProps, parallelism, topicName, recordsInEachPartition, 0); kafkaOffsetHandler.close(); deleteTestTopic(topicName); @@ -619,7 +620,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { * partition 2 --> start from offset 43, read to offset 49 (7 records) */ public void runStartFromGroupOffsets() throws Exception { - // 3 partitions with 50 records each (0-49, so the expected commit offset of each partition should be 50) + // 3 partitions with 50 records each (offsets 0-49) final int parallelism = 3; final int recordsInEachPartition = 50; @@ -645,7 +646,71 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { partitionsToValueCountAndStartOffsets.put(1, new Tuple2<>(50, 0)); // partition 1 should read offset 0-49 partitionsToValueCountAndStartOffsets.put(2, new Tuple2<>(7, 43)); // partition 2 should read offset 43-49 - readSequence(env, StartupMode.GROUP_OFFSETS, readProps, topicName, partitionsToValueCountAndStartOffsets); + readSequence(env, StartupMode.GROUP_OFFSETS, null, readProps, topicName, partitionsToValueCountAndStartOffsets); + + kafkaOffsetHandler.close(); + deleteTestTopic(topicName); + } + + /** + * This test ensures that the consumer correctly uses user-supplied specific offsets when explicitly configured to + * start from specific offsets. For partitions which a specific offset can not be found for, the starting position + * for them should fallback to the group offsets behaviour. + * + * 4 partitions will have 50 records with offsets 0 to 49. The supplied specific offsets map is: + * partition 0 --> start from offset 19 + * partition 1 --> not set + * partition 2 --> start from offset 22 + * partition 3 --> not set + * partition 4 --> start from offset 26 (this should be ignored because the partition does not exist) + * + * The partitions and their committed group offsets are setup as: + * partition 0 --> committed offset 23 + * partition 1 --> committed offset 31 + * partition 2 --> committed offset 43 + * partition 3 --> no commit offset + * + * When configured to start from these specific offsets, each partition should read: + * partition 0 --> start from offset 19, read to offset 49 (31 records) + * partition 1 --> fallback to group offsets, so start from offset 31, read to offset 49 (19 records) + * partition 2 --> start from offset 22, read to offset 49 (28 records) + * partition 3 --> fallback to group offsets, but since there is no group offset for this partition, + * will default to "auto.offset.reset" (set to "earliest"), + * so start from offset 0, read to offset 49 (50 records) + */ + public void runStartFromSpecificOffsets() throws Exception { + // 4 partitions with 50 records each (offsets 0-49) + final int parallelism = 4; + final int recordsInEachPartition = 50; + + final String topicName = writeSequence("testStartFromSpecificOffsetsTopic", recordsInEachPartition, parallelism, 1); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + env.setParallelism(parallelism); + + Properties readProps = new Properties(); + readProps.putAll(standardProps); + readProps.setProperty("auto.offset.reset", "earliest"); // partition 3 should default back to this behaviour + + Map<KafkaTopicPartition, Long> specificStartupOffsets = new HashMap<>(); + specificStartupOffsets.put(new KafkaTopicPartition(topicName, 0), 19L); + specificStartupOffsets.put(new KafkaTopicPartition(topicName, 2), 22L); + specificStartupOffsets.put(new KafkaTopicPartition(topicName, 4), 26L); // non-existing partition, should be ignored + + // only the committed offset for partition 1 should be used, because partition 1 has no entry in specific offset map + KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(); + kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23); + kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31); + kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43); + + Map<Integer, Tuple2<Integer, Integer>> partitionsToValueCountAndStartOffsets = new HashMap<>(); + partitionsToValueCountAndStartOffsets.put(0, new Tuple2<>(31, 19)); // partition 0 should read offset 19-49 + partitionsToValueCountAndStartOffsets.put(1, new Tuple2<>(19, 31)); // partition 1 should read offset 31-49 + partitionsToValueCountAndStartOffsets.put(2, new Tuple2<>(28, 22)); // partition 2 should read offset 22-49 + partitionsToValueCountAndStartOffsets.put(3, new Tuple2<>(50, 0)); // partition 3 should read offset 0-49 + + readSequence(env, StartupMode.SPECIFIC_OFFSETS, specificStartupOffsets, readProps, topicName, partitionsToValueCountAndStartOffsets); kafkaOffsetHandler.close(); deleteTestTopic(topicName); @@ -1781,6 +1846,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { */ protected void readSequence(final StreamExecutionEnvironment env, final StartupMode startupMode, + final Map<KafkaTopicPartition, Long> specificStartupOffsets, final Properties cc, final String topicName, final Map<Integer, Tuple2<Integer, Integer>> partitionsToValuesCountAndStartOffset) throws Exception { @@ -1807,6 +1873,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { case LATEST: consumer.setStartFromLatest(); break; + case SPECIFIC_OFFSETS: + consumer.setStartFromSpecificOffsets(specificStartupOffsets); + break; case GROUP_OFFSETS: consumer.setStartFromGroupOffsets(); break; @@ -1874,11 +1943,12 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { } /** - * Variant of {@link KafkaConsumerTestBase#readSequence(StreamExecutionEnvironment, StartupMode, Properties, String, Map)} to + * Variant of {@link KafkaConsumerTestBase#readSequence(StreamExecutionEnvironment, StartupMode, Map, Properties, String, Map)} to * expect reading from the same start offset and the same value count for all partitions of a single Kafka topic. */ protected void readSequence(final StreamExecutionEnvironment env, final StartupMode startupMode, + final Map<KafkaTopicPartition, Long> specificStartupOffsets, final Properties cc, final int sourceParallelism, final String topicName, @@ -1888,7 +1958,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { for (int i = 0; i < sourceParallelism; i++) { partitionsToValuesCountAndStartOffset.put(i, new Tuple2<>(valuesCount, startFrom)); } - readSequence(env, startupMode, cc, topicName, partitionsToValuesCountAndStartOffset); + readSequence(env, startupMode, specificStartupOffsets, cc, topicName, partitionsToValuesCountAndStartOffset); } protected String writeSequence(