[FLINK-3123] [kafka] Allow custom specific start offsets for FlinkKafkaConsumer

This closes #2687.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5f08e535
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5f08e535
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5f08e535

Branch: refs/heads/master
Commit: 5f08e53592ebd29cfcd8ee486fcfd6229b82aa69
Parents: f214317
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Authored: Fri Mar 10 21:11:42 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Committed: Mon Mar 13 23:38:13 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/kafka.md                    |  35 ++++++-
 .../connectors/kafka/Kafka010ITCase.java        |   4 +
 .../connectors/kafka/Kafka08ITCase.java         |   9 +-
 .../connectors/kafka/Kafka09ITCase.java         |   4 +
 .../kafka/FlinkKafkaConsumerBase.java           | 101 ++++++++++++++++++-
 .../connectors/kafka/config/StartupMode.java    |   9 +-
 .../KafkaConsumerPartitionAssignmentTest.java   |  33 ++++--
 .../connectors/kafka/KafkaConsumerTestBase.java |  82 +++++++++++++--
 8 files changed, 251 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index 06e40b2..6d58b0c 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -220,7 +220,40 @@ All versions of the Flink Kafka Consumer have the above 
explicit configuration m
  record. Under these modes, committed offsets in Kafka will be ignored and
  not used as starting positions.
  
-Note that these settings do not affect the start position when the job is
+You can also specify the exact offsets the consumer should start from for each 
partition:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
+specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
+specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);
+specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);
+
+myConsumer.setStartFromSpecificOffsets(specificStartOffsets);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, 
java.lang.Long]()
+specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L)
+specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L)
+specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L)
+
+myConsumer.setStartFromSpecificOffsets(specificStartOffsets)
+{% endhighlight %}
+</div>
+</div>
+
+The above example configures the consumer to start from the specified offsets 
for
+partitions 0, 1, and 2 of topic `myTopic`. The offset values should be the
+next record that the consumer should read for each partition. Note that
+if the consumer needs to read a partition which does not have a specified
+offset within the provided offsets map, it will fallback to the default
+group offsets behaviour (i.e. `setStartFromGroupOffsets()`) for that
+particular partition.
+
+Note that these start position configuration methods do not affect the start 
position when the job is
 automatically restored from a failure or manually restored using a savepoint.
 On restore, the start position of each Kafka partition is determined by the
 offsets stored in the savepoint or checkpoint

http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
index a375fb6..2085169 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
@@ -147,6 +147,10 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
                runStartFromGroupOffsets();
        }
 
+       @Test(timeout = 60000)
+       public void testStartFromSpecificOffsets() throws Exception {
+               runStartFromSpecificOffsets();
+       }
 
        // --- offset committing ---
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index 3fc00e9..2e7c368 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -90,7 +90,7 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
                final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
                env.getConfig().disableSysoutLogging();
 
-               readSequence(env, StartupMode.GROUP_OFFSETS, standardProps, 
parallelism, topic, valuesCount, startFrom);
+               readSequence(env, StartupMode.GROUP_OFFSETS, null, 
standardProps, parallelism, topic, valuesCount, startFrom);
 
                deleteTestTopic(topic);
        }
@@ -136,6 +136,11 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
                runStartFromGroupOffsets();
        }
 
+       @Test(timeout = 60000)
+       public void testStartFromSpecificOffsets() throws Exception {
+               runStartFromSpecificOffsets();
+       }
+
        // --- offset committing ---
 
        @Test(timeout = 60000)
@@ -200,7 +205,7 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
                readProps.setProperty("auto.commit.interval.ms", "500");
 
                // read so that the offset can be committed to ZK
-               readSequence(env, StartupMode.GROUP_OFFSETS, readProps, 
parallelism, topicName, 100, 0);
+               readSequence(env, StartupMode.GROUP_OFFSETS, null, readProps, 
parallelism, topicName, 100, 0);
 
                // get the offset
                CuratorFramework curatorFramework = 
((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();

http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
index 6added7..ca9965c 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
@@ -127,6 +127,10 @@ public class Kafka09ITCase extends KafkaConsumerTestBase {
                runStartFromGroupOffsets();
        }
 
+       @Test(timeout = 60000)
+       public void testStartFromSpecificOffsets() throws Exception {
+               runStartFromSpecificOffsets();
+       }
 
        // --- offset committing ---
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 144ede8..027751c 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -40,11 +40,13 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -104,6 +106,9 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
        /** The startup mode for the consumer (default is {@link 
StartupMode#GROUP_OFFSETS}) */
        protected StartupMode startupMode = StartupMode.GROUP_OFFSETS;
 
+       /** Specific startup offsets; only relevant when startup mode is {@link 
StartupMode#SPECIFIC_OFFSETS} */
+       protected Map<KafkaTopicPartition, Long> specificStartupOffsets;
+
        // 
------------------------------------------------------------------------
        //  runtime state (used individually by each parallel subtask) 
        // 
------------------------------------------------------------------------
@@ -210,23 +215,33 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
 
        /**
         * Specifies the consumer to start reading from the earliest offset for 
all partitions.
-        * This ignores any committed group offsets in Zookeeper / Kafka 
brokers.
+        * This lets the consumer ignore any committed group offsets in 
Zookeeper / Kafka brokers.
+        *
+        * This method does not effect where partitions are read from when the 
consumer is restored
+        * from a checkpoint or savepoint. When the consumer is restored from a 
checkpoint or
+        * savepoint, only the offsets in the restored state will be used.
         *
         * @return The consumer object, to allow function chaining.
         */
        public FlinkKafkaConsumerBase<T> setStartFromEarliest() {
                this.startupMode = StartupMode.EARLIEST;
+               this.specificStartupOffsets = null;
                return this;
        }
 
        /**
         * Specifies the consumer to start reading from the latest offset for 
all partitions.
-        * This ignores any committed group offsets in Zookeeper / Kafka 
brokers.
+        * This lets the consumer ignore any committed group offsets in 
Zookeeper / Kafka brokers.
+        *
+        * This method does not effect where partitions are read from when the 
consumer is restored
+        * from a checkpoint or savepoint. When the consumer is restored from a 
checkpoint or
+        * savepoint, only the offsets in the restored state will be used.
         *
         * @return The consumer object, to allow function chaining.
         */
        public FlinkKafkaConsumerBase<T> setStartFromLatest() {
                this.startupMode = StartupMode.LATEST;
+               this.specificStartupOffsets = null;
                return this;
        }
 
@@ -236,10 +251,41 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
         * properties. If no offset can be found for a partition, the behaviour 
in "auto.offset.reset"
         * set in the configuration properties will be used for the partition.
         *
+        * This method does not effect where partitions are read from when the 
consumer is restored
+        * from a checkpoint or savepoint. When the consumer is restored from a 
checkpoint or
+        * savepoint, only the offsets in the restored state will be used.
+        *
         * @return The consumer object, to allow function chaining.
         */
        public FlinkKafkaConsumerBase<T> setStartFromGroupOffsets() {
                this.startupMode = StartupMode.GROUP_OFFSETS;
+               this.specificStartupOffsets = null;
+               return this;
+       }
+
+       /**
+        * Specifies the consumer to start reading partitions from specific 
offsets, set independently for each partition.
+        * The specified offset should be the offset of the next record that 
will be read from partitions.
+        * This lets the consumer ignore any committed group offsets in 
Zookeeper / Kafka brokers.
+        *
+        * If the provided map of offsets contains entries whose {@link 
KafkaTopicPartition} is not subscribed by the
+        * consumer, the entry will be ignored. If the consumer subscribes to a 
partition that does not exist in the provided
+        * map of offsets, the consumer will fallback to the default group 
offset behaviour (see
+        * {@link FlinkKafkaConsumerBase#setStartFromGroupOffsets()}) for that 
particular partition.
+        *
+        * If the specified offset for a partition is invalid, or the behaviour 
for that partition is defaulted to group
+        * offsets but still no group offset could be found for it, then the 
"auto.offset.reset" behaviour set in the
+        * configuration properties will be used for the partition
+        *
+        * This method does not effect where partitions are read from when the 
consumer is restored
+        * from a checkpoint or savepoint. When the consumer is restored from a 
checkpoint or
+        * savepoint, only the offsets in the restored state will be used.
+        *
+        * @return The consumer object, to allow function chaining.
+        */
+       public FlinkKafkaConsumerBase<T> 
setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> 
specificStartupOffsets) {
+               this.startupMode = StartupMode.SPECIFIC_OFFSETS;
+               this.specificStartupOffsets = 
checkNotNull(specificStartupOffsets);
                return this;
        }
 
@@ -269,7 +315,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
                                        kafkaTopicPartitions,
                                        
getRuntimeContext().getIndexOfThisSubtask(),
                                        
getRuntimeContext().getNumberOfParallelSubtasks(),
-                                       startupMode);
+                                       startupMode,
+                                       specificStartupOffsets);
 
                                if (subscribedPartitionsToStartOffsets.size() 
!= 0) {
                                        switch (startupMode) {
@@ -285,6 +332,28 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
                                                                
subscribedPartitionsToStartOffsets.size(),
                                                                
subscribedPartitionsToStartOffsets.keySet());
                                                        break;
+                                               case SPECIFIC_OFFSETS:
+                                                       LOG.info("Consumer 
subtask {} will start reading the following {} partitions from the specified 
startup offsets {}: {}",
+                                                               
getRuntimeContext().getIndexOfThisSubtask(),
+                                                               
subscribedPartitionsToStartOffsets.size(),
+                                                               
specificStartupOffsets,
+                                                               
subscribedPartitionsToStartOffsets.keySet());
+
+                                                       
List<KafkaTopicPartition> partitionsDefaultedToGroupOffsets = new 
ArrayList<>(subscribedPartitionsToStartOffsets.size());
+                                                       for 
(Map.Entry<KafkaTopicPartition, Long> subscribedPartition : 
subscribedPartitionsToStartOffsets.entrySet()) {
+                                                               if 
(subscribedPartition.getValue() == 
KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
+                                                                       
partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey());
+                                                               }
+                                                       }
+
+                                                       if 
(partitionsDefaultedToGroupOffsets.size() > 0) {
+                                                               
LOG.warn("Consumer subtask {} cannot find offsets for the following {} 
partitions in the specified startup offsets: {}" +
+                                                                               
"; their startup offsets will be defaulted to their committed group offsets in 
Kafka.",
+                                                                       
getRuntimeContext().getIndexOfThisSubtask(),
+                                                                       
partitionsDefaultedToGroupOffsets.size(),
+                                                                       
partitionsDefaultedToGroupOffsets);
+                                                       }
+                                                       break;
                                                default:
                                                case GROUP_OFFSETS:
                                                        LOG.info("Consumer 
subtask {} will start reading the following {} partitions from the committed 
group offsets in Kafka: {}",
@@ -550,6 +619,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
         * @param indexOfThisSubtask the index of this consumer instance
         * @param numParallelSubtasks total number of parallel consumer 
instances
         * @param startupMode the configured startup mode for the consumer
+        * @param specificStartupOffsets specific partition offsets to start 
from
+        *                               (only relevant if startupMode is 
{@link StartupMode#SPECIFIC_OFFSETS})
         *
         * Note: This method is also exposed for testing.
         */
@@ -558,11 +629,31 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
                        List<KafkaTopicPartition> kafkaTopicPartitions,
                        int indexOfThisSubtask,
                        int numParallelSubtasks,
-                       StartupMode startupMode) {
+                       StartupMode startupMode,
+                       Map<KafkaTopicPartition, Long> specificStartupOffsets) {
 
                for (int i = 0; i < kafkaTopicPartitions.size(); i++) {
                        if (i % numParallelSubtasks == indexOfThisSubtask) {
-                               
subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), 
startupMode.getStateSentinel());
+                               if (startupMode != 
StartupMode.SPECIFIC_OFFSETS) {
+                                       
subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), 
startupMode.getStateSentinel());
+                               } else {
+                                       if (specificStartupOffsets == null) {
+                                               throw new 
IllegalArgumentException(
+                                                       "Startup mode for the 
consumer set to " + StartupMode.SPECIFIC_OFFSETS +
+                                                               ", but no 
specific offsets were specified");
+                                       }
+
+                                       KafkaTopicPartition partition = 
kafkaTopicPartitions.get(i);
+
+                                       Long specificOffset = 
specificStartupOffsets.get(partition);
+                                       if (specificOffset != null) {
+                                               // since the specified offsets 
represent the next record to read, we subtract
+                                               // it by one so that the 
initial state of the consumer will be correct
+                                               
subscribedPartitionsToStartOffsets.put(partition, specificOffset - 1);
+                                       } else {
+                                               
subscribedPartitionsToStartOffsets.put(partition, 
KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+                                       }
+                               }
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
index f796e62..8fc2fe0 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/config/StartupMode.java
@@ -30,7 +30,14 @@ public enum StartupMode {
        EARLIEST(KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET),
 
        /** Start from the latest offset */
-       LATEST(KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
+       LATEST(KafkaTopicPartitionStateSentinel.LATEST_OFFSET),
+
+       /**
+        * Start from user-supplied specific offsets for each partition.
+        * Since this mode will have specific offsets to start with, we do not 
need a sentinel value;
+        * using Long.MIN_VALUE as a placeholder.
+        */
+       SPECIFIC_OFFSETS(Long.MIN_VALUE);
 
        /** The sentinel offset value corresponding to this startup mode */
        private long stateSentinel;

http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
index 379d53a..c24640d 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
@@ -56,7 +56,8 @@ public class KafkaConsumerPartitionAssignmentTest {
                                        inPartitions,
                                        i,
                                        inPartitions.size(),
-                                       StartupMode.GROUP_OFFSETS);
+                                       StartupMode.GROUP_OFFSETS,
+                                       null);
 
                                List<KafkaTopicPartition> subscribedPartitions 
= new ArrayList<>(subscribedPartitionsToStartOffsets.keySet());
 
@@ -95,7 +96,8 @@ public class KafkaConsumerPartitionAssignmentTest {
                                        partitions,
                                        i,
                                        numConsumers,
-                                       StartupMode.GROUP_OFFSETS);
+                                       StartupMode.GROUP_OFFSETS,
+                                       null);
 
                                List<KafkaTopicPartition> subscribedPartitions 
= new ArrayList<>(subscribedPartitionsToStartOffsets.keySet());
 
@@ -138,7 +140,8 @@ public class KafkaConsumerPartitionAssignmentTest {
                                        inPartitions,
                                        i,
                                        numConsumers,
-                                       StartupMode.GROUP_OFFSETS);
+                                       StartupMode.GROUP_OFFSETS,
+                                       null);
 
                                List<KafkaTopicPartition> subscribedPartitions 
= new ArrayList<>(subscribedPartitionsToStartOffsets.keySet());
 
@@ -169,7 +172,8 @@ public class KafkaConsumerPartitionAssignmentTest {
                                ep,
                                2,
                                4,
-                               StartupMode.GROUP_OFFSETS);
+                               StartupMode.GROUP_OFFSETS,
+                               null);
                        
assertTrue(subscribedPartitionsToStartOffsets.entrySet().isEmpty());
 
                        subscribedPartitionsToStartOffsets = new HashMap<>();
@@ -178,7 +182,8 @@ public class KafkaConsumerPartitionAssignmentTest {
                                ep,
                                0,
                                1,
-                               StartupMode.GROUP_OFFSETS);
+                               StartupMode.GROUP_OFFSETS,
+                               null);
                        
assertTrue(subscribedPartitionsToStartOffsets.entrySet().isEmpty());
                }
                catch (Exception e) {
@@ -218,21 +223,24 @@ public class KafkaConsumerPartitionAssignmentTest {
                                initialPartitions,
                                0,
                                numConsumers,
-                               StartupMode.GROUP_OFFSETS);
+                               StartupMode.GROUP_OFFSETS,
+                               null);
 
                        
FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
                                subscribedPartitionsToStartOffsets2,
                                initialPartitions,
                                1,
                                numConsumers,
-                               StartupMode.GROUP_OFFSETS);
+                               StartupMode.GROUP_OFFSETS,
+                               null);
 
                        
FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
                                subscribedPartitionsToStartOffsets3,
                                initialPartitions,
                                2,
                                numConsumers,
-                               StartupMode.GROUP_OFFSETS);
+                               StartupMode.GROUP_OFFSETS,
+                               null);
 
                        List<KafkaTopicPartition> subscribedPartitions1 = new 
ArrayList<>(subscribedPartitionsToStartOffsets1.keySet());
                        List<KafkaTopicPartition> subscribedPartitions2 = new 
ArrayList<>(subscribedPartitionsToStartOffsets2.keySet());
@@ -274,21 +282,24 @@ public class KafkaConsumerPartitionAssignmentTest {
                                newPartitions,
                                0,
                                numConsumers,
-                               StartupMode.GROUP_OFFSETS);
+                               StartupMode.GROUP_OFFSETS,
+                               null);
 
                        
FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
                                subscribedPartitionsToStartOffsets2,
                                newPartitions,
                                1,
                                numConsumers,
-                               StartupMode.GROUP_OFFSETS);
+                               StartupMode.GROUP_OFFSETS,
+                               null);
 
                        
FlinkKafkaConsumerBase.initializeSubscribedPartitionsToStartOffsets(
                                subscribedPartitionsToStartOffsets3,
                                newPartitions,
                                2,
                                numConsumers,
-                               StartupMode.GROUP_OFFSETS);
+                               StartupMode.GROUP_OFFSETS,
+                               null);
 
                        List<KafkaTopicPartition> subscribedPartitions1New = 
new ArrayList<>(subscribedPartitionsToStartOffsets1.keySet());
                        List<KafkaTopicPartition> subscribedPartitions2New = 
new ArrayList<>(subscribedPartitionsToStartOffsets2.keySet());

http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 580c507..ddac61c 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -62,6 +62,7 @@ import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
@@ -349,7 +350,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                        (o3 != null) ? o3.intValue() : 0
                ));
 
-               readSequence(env2, StartupMode.GROUP_OFFSETS, standardProps, 
topicName, partitionsToValuesCountAndStartOffset);
+               readSequence(env2, StartupMode.GROUP_OFFSETS, null, 
standardProps, topicName, partitionsToValuesCountAndStartOffset);
 
                kafkaOffsetHandler.close();
                deleteTestTopic(topicName);
@@ -465,7 +466,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
                kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
 
-               readSequence(env, StartupMode.EARLIEST, readProps, parallelism, 
topicName, recordsInEachPartition, 0);
+               readSequence(env, StartupMode.EARLIEST, null, readProps, 
parallelism, topicName, recordsInEachPartition, 0);
 
                kafkaOffsetHandler.close();
                deleteTestTopic(topicName);
@@ -619,7 +620,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
         *      partition 2 --> start from offset 43, read to offset 49 (7 
records)
         */
        public void runStartFromGroupOffsets() throws Exception {
-               // 3 partitions with 50 records each (0-49, so the expected 
commit offset of each partition should be 50)
+               // 3 partitions with 50 records each (offsets 0-49)
                final int parallelism = 3;
                final int recordsInEachPartition = 50;
 
@@ -645,7 +646,71 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                partitionsToValueCountAndStartOffsets.put(1, new Tuple2<>(50, 
0)); // partition 1 should read offset 0-49
                partitionsToValueCountAndStartOffsets.put(2, new Tuple2<>(7, 
43));      // partition 2 should read offset 43-49
 
-               readSequence(env, StartupMode.GROUP_OFFSETS, readProps, 
topicName, partitionsToValueCountAndStartOffsets);
+               readSequence(env, StartupMode.GROUP_OFFSETS, null, readProps, 
topicName, partitionsToValueCountAndStartOffsets);
+
+               kafkaOffsetHandler.close();
+               deleteTestTopic(topicName);
+       }
+
+       /**
+        * This test ensures that the consumer correctly uses user-supplied 
specific offsets when explicitly configured to
+        * start from specific offsets. For partitions which a specific offset 
can not be found for, the starting position
+        * for them should fallback to the group offsets behaviour.
+        *
+        * 4 partitions will have 50 records with offsets 0 to 49. The supplied 
specific offsets map is:
+        *      partition 0 --> start from offset 19
+        *      partition 1 --> not set
+        *      partition 2 --> start from offset 22
+        *      partition 3 --> not set
+        *      partition 4 --> start from offset 26 (this should be ignored 
because the partition does not exist)
+        *
+        * The partitions and their committed group offsets are setup as:
+        *      partition 0 --> committed offset 23
+        *      partition 1 --> committed offset 31
+        *      partition 2 --> committed offset 43
+        *      partition 3 --> no commit offset
+        *
+        * When configured to start from these specific offsets, each partition 
should read:
+        *      partition 0 --> start from offset 19, read to offset 49 (31 
records)
+        *      partition 1 --> fallback to group offsets, so start from offset 
31, read to offset 49 (19 records)
+        *      partition 2 --> start from offset 22, read to offset 49 (28 
records)
+        *      partition 3 --> fallback to group offsets, but since there is 
no group offset for this partition,
+        *                      will default to "auto.offset.reset" (set to 
"earliest"),
+        *                      so start from offset 0, read to offset 49 (50 
records)
+        */
+       public void runStartFromSpecificOffsets() throws Exception {
+               // 4 partitions with 50 records each (offsets 0-49)
+               final int parallelism = 4;
+               final int recordsInEachPartition = 50;
+
+               final String topicName = 
writeSequence("testStartFromSpecificOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+               env.getConfig().disableSysoutLogging();
+               env.setParallelism(parallelism);
+
+               Properties readProps = new Properties();
+               readProps.putAll(standardProps);
+               readProps.setProperty("auto.offset.reset", "earliest"); // 
partition 3 should default back to this behaviour
+
+               Map<KafkaTopicPartition, Long> specificStartupOffsets = new 
HashMap<>();
+               specificStartupOffsets.put(new KafkaTopicPartition(topicName, 
0), 19L);
+               specificStartupOffsets.put(new KafkaTopicPartition(topicName, 
2), 22L);
+               specificStartupOffsets.put(new KafkaTopicPartition(topicName, 
4), 26L); // non-existing partition, should be ignored
+
+               // only the committed offset for partition 1 should be used, 
because partition 1 has no entry in specific offset map
+               KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler();
+               kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
+               kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
+               kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
+
+               Map<Integer, Tuple2<Integer, Integer>> 
partitionsToValueCountAndStartOffsets = new HashMap<>();
+               partitionsToValueCountAndStartOffsets.put(0, new Tuple2<>(31, 
19)); // partition 0 should read offset 19-49
+               partitionsToValueCountAndStartOffsets.put(1, new Tuple2<>(19, 
31)); // partition 1 should read offset 31-49
+               partitionsToValueCountAndStartOffsets.put(2, new Tuple2<>(28, 
22));     // partition 2 should read offset 22-49
+               partitionsToValueCountAndStartOffsets.put(3, new Tuple2<>(50, 
0));      // partition 3 should read offset 0-49
+
+               readSequence(env, StartupMode.SPECIFIC_OFFSETS, 
specificStartupOffsets, readProps, topicName, 
partitionsToValueCountAndStartOffsets);
 
                kafkaOffsetHandler.close();
                deleteTestTopic(topicName);
@@ -1781,6 +1846,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
         */
        protected void readSequence(final StreamExecutionEnvironment env,
                                                                final 
StartupMode startupMode,
+                                                               final 
Map<KafkaTopicPartition, Long> specificStartupOffsets,
                                                                final 
Properties cc,
                                                                final String 
topicName,
                                                                final 
Map<Integer, Tuple2<Integer, Integer>> partitionsToValuesCountAndStartOffset) 
throws Exception {
@@ -1807,6 +1873,9 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                        case LATEST:
                                consumer.setStartFromLatest();
                                break;
+                       case SPECIFIC_OFFSETS:
+                               
consumer.setStartFromSpecificOffsets(specificStartupOffsets);
+                               break;
                        case GROUP_OFFSETS:
                                consumer.setStartFromGroupOffsets();
                                break;
@@ -1874,11 +1943,12 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
        }
 
        /**
-        * Variant of {@link 
KafkaConsumerTestBase#readSequence(StreamExecutionEnvironment, StartupMode, 
Properties, String, Map)} to
+        * Variant of {@link 
KafkaConsumerTestBase#readSequence(StreamExecutionEnvironment, StartupMode, 
Map, Properties, String, Map)} to
         * expect reading from the same start offset and the same value count 
for all partitions of a single Kafka topic.
         */
        protected void readSequence(final StreamExecutionEnvironment env,
                                                                final 
StartupMode startupMode,
+                                                               final 
Map<KafkaTopicPartition, Long> specificStartupOffsets,
                                                                final 
Properties cc,
                                                                final int 
sourceParallelism,
                                                                final String 
topicName,
@@ -1888,7 +1958,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                for (int i = 0; i < sourceParallelism; i++) {
                        partitionsToValuesCountAndStartOffset.put(i, new 
Tuple2<>(valuesCount, startFrom));
                }
-               readSequence(env, startupMode, cc, topicName, 
partitionsToValuesCountAndStartOffset);
+               readSequence(env, startupMode, specificStartupOffsets, cc, 
topicName, partitionsToValuesCountAndStartOffset);
        }
 
        protected String writeSequence(

Reply via email to