Repository: flink
Updated Branches:
  refs/heads/release-1.1 a34559d20 -> e296acae5


[FLINK-6006] [kafka] Always use complete restored state in FlinkKafkaConsumer

Previously, the Kafka Consumer performs partition list querying on
restore, and then uses it to filter out restored state of partitions
that doesn't exist in the list.

If in any case the returned partitions list is incomplete (i.e. missing
partitions that existed before perhaps due to temporary ZK / broker
downtimes), then the state of the missing partitions is dropped and
cannot be recovered anymore.

This commit fixes this by always restoring the complete state, without
any sort of filtering. We simply let the consumer fail if assigned
partitions to the consuming threads / Kafka clients are unreachable when
the consumer starts running.

This closes #3507.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e296acae
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e296acae
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e296acae

Branch: refs/heads/release-1.1
Commit: e296acae5269d3da7af87514804c06b865e40d3e
Parents: a34559d
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Fri Mar 10 14:47:57 2017 +0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Wed Mar 15 22:35:22 2017 +0800

----------------------------------------------------------------------
 .../kafka/FlinkKafkaConsumerBase.java           | 44 ++++++----
 .../kafka/internals/AbstractFetcher.java        | 33 ++++---
 .../kafka/FlinkKafkaConsumerBaseTest.java       | 92 ++++++++++++++++++--
 .../KafkaConsumerPartitionAssignmentTest.java   | 51 ++++++++---
 .../kafka/KafkaShortRetentionTestBase.java      |  1 -
 5 files changed, 175 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e296acae/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 2b2c527..bab7639 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -210,8 +210,11 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
                }
                
                // figure out which partitions this subtask should process
-               final List<KafkaTopicPartition> thisSubtaskPartitions = 
assignPartitions(allSubscribedPartitions,
-                               
getRuntimeContext().getNumberOfParallelSubtasks(), 
getRuntimeContext().getIndexOfThisSubtask());
+               final List<KafkaTopicPartition> thisSubtaskPartitions = 
assignPartitions(
+                               restoreToOffset,
+                               allSubscribedPartitions,
+                               
getRuntimeContext().getNumberOfParallelSubtasks(),
+                               getRuntimeContext().getIndexOfThisSubtask());
                
                // we need only do work, if we actually have partitions assigned
                if (!thisSubtaskPartitions.isEmpty()) {
@@ -416,29 +419,38 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
        // 
------------------------------------------------------------------------
 
        /**
-        * Selects which of the given partitions should be handled by a 
specific consumer,
-        * given a certain number of consumers.
-        * 
-        * @param allPartitions The partitions to select from
+        * Determines which partitions the consumer should subscribe to.
+        *
+        * If we have restored offsets, we simply use that as the subscribed 
partitions for the subtask.
+        * Otherwise, we select from the given complete list of partitions, 
given a certain number of consumer subtasks.
+        *
+        * @param restoredPartitionOffsets The restored partition offsets, if 
any
+        * @param completeKafkaPartitionsList The complete list of kafka 
partitions
         * @param numConsumers The number of consumers
         * @param consumerIndex The index of the specific consumer
         * 
         * @return The sublist of partitions to be handled by that consumer.
         */
        protected static List<KafkaTopicPartition> assignPartitions(
-                       List<KafkaTopicPartition> allPartitions,
-                       int numConsumers, int consumerIndex)
+                       Map<KafkaTopicPartition, Long> restoredPartitionOffsets,
+                       List<KafkaTopicPartition> completeKafkaPartitionsList,
+                       int numConsumers,
+                       int consumerIndex)
        {
-               final List<KafkaTopicPartition> thisSubtaskPartitions = new 
ArrayList<>(
-                               allPartitions.size() / numConsumers + 1);
-
-               for (int i = 0; i < allPartitions.size(); i++) {
-                       if (i % numConsumers == consumerIndex) {
-                               thisSubtaskPartitions.add(allPartitions.get(i));
+               if (restoredPartitionOffsets != null) {
+                       return new 
ArrayList<>(restoredPartitionOffsets.keySet());
+               } else {
+                       final List<KafkaTopicPartition> thisSubtaskPartitions =
+                               new 
ArrayList<>(completeKafkaPartitionsList.size() / numConsumers + 1);
+
+                       for (int i = 0; i < completeKafkaPartitionsList.size(); 
i++) {
+                               if (i % numConsumers == consumerIndex) {
+                                       
thisSubtaskPartitions.add(completeKafkaPartitionsList.get(i));
+                               }
                        }
+
+                       return thisSubtaskPartitions;
                }
-               
-               return thisSubtaskPartitions;
        }
        
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/e296acae/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 8ec26cc..979dccc 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -181,23 +181,36 @@ public abstract class AbstractFetcher<T, KPH> {
 
                HashMap<KafkaTopicPartition, Long> state = new 
HashMap<>(allPartitions.length);
                for (KafkaTopicPartitionState<?> partition : 
subscribedPartitions()) {
-                       if (partition.isOffsetDefined()) {
-                               state.put(partition.getKafkaTopicPartition(), 
partition.getOffset());
-                       }
+                       state.put(partition.getKafkaTopicPartition(), 
partition.getOffset());
                }
                return state;
        }
 
        /**
         * Restores the partition offsets.
-        * 
-        * @param snapshotState The offsets for the partitions 
+        * The partitions in the provided map of restored partitions to offsets 
must completely match
+        * the fetcher's subscribed partitions.
+        *
+        * @param restoredOffsets The restored offsets for the partitions
+        *
+        * @throws IllegalStateException if the partitions in the provided 
restored offsets map
+        * cannot completely match the fetcher's subscribed partitions.
         */
-       public void restoreOffsets(HashMap<KafkaTopicPartition, Long> 
snapshotState) {
-               for (KafkaTopicPartitionState<?> partition : allPartitions) {
-                       Long offset = 
snapshotState.get(partition.getKafkaTopicPartition());
-                       if (offset != null) {
-                               partition.setOffset(offset);
+       public void restoreOffsets(Map<KafkaTopicPartition, Long> 
restoredOffsets) {
+               if (restoredOffsets.size() != allPartitions.length) {
+                       throw new IllegalStateException(
+                               "The fetcher was restored with partition 
offsets that do not " +
+                                       "match with the subscribed partitions: 
" + restoredOffsets);
+               } else {
+                       for (KafkaTopicPartitionState<?> partition : 
allPartitions) {
+                               Long offset = 
restoredOffsets.get(partition.getKafkaTopicPartition());
+                               if (offset != null) {
+                                       partition.setOffset(offset);
+                               } else {
+                                       throw new IllegalStateException(
+                                               "The fetcher was restored with 
partition offsets that do not " +
+                                                       "contain offsets for 
subscribed partition " + partition.getKafkaTopicPartition());
+                               }
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e296acae/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 9b517df..3926513 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -19,8 +19,11 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.commons.collections.map.LinkedMap;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
@@ -30,6 +33,7 @@ import org.apache.flink.util.SerializedValue;
 import org.junit.Test;
 
 import java.lang.reflect.Field;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 
@@ -44,12 +48,14 @@ public class FlinkKafkaConsumerBaseTest {
        @Test
        public void testEitherWatermarkExtractor() {
                try {
-                       new 
DummyFlinkKafkaConsumer<>().assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks<Object>)
 null);
+                       new DummyFlinkKafkaConsumer<>(null, null, 1, 0)
+                               
.assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks<Object>) null);
                        fail();
                } catch (NullPointerException ignored) {}
 
                try {
-                       new 
DummyFlinkKafkaConsumer<>().assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks<Object>)
 null);
+                       new DummyFlinkKafkaConsumer<>(null, null, 1, 0)
+                               
.assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks<Object>) null);
                        fail();
                } catch (NullPointerException ignored) {}
                
@@ -58,14 +64,14 @@ public class FlinkKafkaConsumerBaseTest {
                @SuppressWarnings("unchecked")
                final AssignerWithPunctuatedWatermarks<String> 
punctuatedAssigner = mock(AssignerWithPunctuatedWatermarks.class);
                
-               DummyFlinkKafkaConsumer<String> c1 = new 
DummyFlinkKafkaConsumer<>();
+               DummyFlinkKafkaConsumer<String> c1 = new 
DummyFlinkKafkaConsumer<>(null, null, 1, 0);
                c1.assignTimestampsAndWatermarks(periodicAssigner);
                try {
                        c1.assignTimestampsAndWatermarks(punctuatedAssigner);
                        fail();
                } catch (IllegalStateException ignored) {}
 
-               DummyFlinkKafkaConsumer<String> c2 = new 
DummyFlinkKafkaConsumer<>();
+               DummyFlinkKafkaConsumer<String> c2 = new 
DummyFlinkKafkaConsumer<>(null, null, 1, 0);
                c2.assignTimestampsAndWatermarks(punctuatedAssigner);
                try {
                        c2.assignTimestampsAndWatermarks(periodicAssigner);
@@ -109,6 +115,45 @@ public class FlinkKafkaConsumerBaseTest {
                FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new 
LinkedMap(), true);
                assertNull(consumer.snapshotState(17L, 23L));
        }
+
+       /**
+        * Tests that the fetcher is restored with all partitions in the 
restored state,
+        * regardless of the queried complete list of Kafka partitions.
+        */
+       @Test
+       public void testStateIntactOnRestore() throws Exception {
+               @SuppressWarnings("unchecked")
+               final AbstractFetcher<String, ?> fetcher = 
mock(AbstractFetcher.class);
+
+               HashMap<KafkaTopicPartition, Long> restoreState = new 
HashMap<>();
+               restoreState.put(new KafkaTopicPartition("test-topic", 0), 23L);
+               restoreState.put(new KafkaTopicPartition("test-topic", 2), 42L);
+
+               List<KafkaTopicPartition> mockCompletePartitions = new 
ArrayList<>(6);
+               mockCompletePartitions.add(new 
KafkaTopicPartition("test-topic", 0));
+               mockCompletePartitions.add(new 
KafkaTopicPartition("test-topic", 1));
+               mockCompletePartitions.add(new 
KafkaTopicPartition("test-topic", 2));
+               mockCompletePartitions.add(new 
KafkaTopicPartition("test-topic", 3));
+               mockCompletePartitions.add(new 
KafkaTopicPartition("test-topic", 4));
+
+               List<KafkaTopicPartition> expectedFetcherSubscribedPartitions = 
new ArrayList<>(3);
+               expectedFetcherSubscribedPartitions.add(new 
KafkaTopicPartition("test-topic", 0));
+               expectedFetcherSubscribedPartitions.add(new 
KafkaTopicPartition("test-topic", 2));
+
+               FlinkKafkaConsumerBase<String> consumer = new 
DummyFlinkKafkaConsumer<>(
+                       fetcher,
+                       expectedFetcherSubscribedPartitions,
+                       2,
+                       0);
+
+               consumer.setSubscribedPartitions(mockCompletePartitions);
+               consumer.restoreState(restoreState);
+               consumer.open(new Configuration());
+
+               consumer.run(mock(SourceFunction.SourceContext.class));
+
+               verify(fetcher).restoreOffsets(restoreState);
+       }
        
        @Test
        @SuppressWarnings("unchecked")
@@ -187,7 +232,7 @@ public class FlinkKafkaConsumerBaseTest {
        private static <T> FlinkKafkaConsumerBase<T> getConsumer(
                        AbstractFetcher<T, ?> fetcher, LinkedMap 
pendingCheckpoints, boolean running) throws Exception
        {
-               FlinkKafkaConsumerBase<T> consumer = new 
DummyFlinkKafkaConsumer<>();
+               FlinkKafkaConsumerBase<T> consumer = new 
DummyFlinkKafkaConsumer<>(fetcher, null, 1, 0);
 
                Field fetcherField = 
FlinkKafkaConsumerBase.class.getDeclaredField("kafkaFetcher");
                fetcherField.setAccessible(true);
@@ -209,14 +254,45 @@ public class FlinkKafkaConsumerBaseTest {
        private static final class DummyFlinkKafkaConsumer<T> extends 
FlinkKafkaConsumerBase<T> {
                private static final long serialVersionUID = 1L;
 
+               private final AbstractFetcher<T, ?> mockFetcher;
+               private final List<KafkaTopicPartition> 
expectedThisSubtaskPartitions;
+               private final int numConsumerSubtasks;
+               private final int thisConsumerSubtaskIndex;
+
                @SuppressWarnings("unchecked")
-               public DummyFlinkKafkaConsumer() {
+               public DummyFlinkKafkaConsumer(
+                               AbstractFetcher<T, ?> mockFetcher,
+                               List<KafkaTopicPartition> 
expectedThisSubtaskPartitions,
+                               int numConsumerSubtasks,
+                               int thisConsumerSubtaskIndex) {
                        super((KeyedDeserializationSchema<T>) 
mock(KeyedDeserializationSchema.class));
+                       this.mockFetcher = mockFetcher;
+                       this.expectedThisSubtaskPartitions = 
expectedThisSubtaskPartitions;
+                       this.numConsumerSubtasks = numConsumerSubtasks;
+                       this.thisConsumerSubtaskIndex = 
thisConsumerSubtaskIndex;
+               }
+
+               @Override
+               protected AbstractFetcher<T, ?> createFetcher(
+                               SourceContext<T> sourceContext,
+                               List<KafkaTopicPartition> thisSubtaskPartitions,
+                               
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
+                               
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
+                               StreamingRuntimeContext runtimeContext) throws 
Exception {
+
+                       assertEquals(expectedThisSubtaskPartitions.size(), 
thisSubtaskPartitions.size());
+                       for (KafkaTopicPartition expectedPartition : 
expectedThisSubtaskPartitions) {
+                               
thisSubtaskPartitions.contains(expectedPartition);
+                       }
+                       return mockFetcher;
                }
 
                @Override
-               protected AbstractFetcher<T, ?> createFetcher(SourceContext<T> 
sourceContext, List<KafkaTopicPartition> thisSubtaskPartitions, 
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, 
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, 
StreamingRuntimeContext runtimeContext) throws Exception {
-                       return null;
+               public RuntimeContext getRuntimeContext() {
+                       RuntimeContext context = 
mock(StreamingRuntimeContext.class);
+                       
when(context.getNumberOfParallelSubtasks()).thenReturn(numConsumerSubtasks);
+                       
when(context.getIndexOfThisSubtask()).thenReturn(thisConsumerSubtaskIndex);
+                       return context;
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e296acae/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
index 9beed22..d580a37 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
@@ -24,8 +24,10 @@ import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import static org.junit.Assert.*;
@@ -46,7 +48,7 @@ public class KafkaConsumerPartitionAssignmentTest {
 
                        for (int i = 0; i < inPartitions.size(); i++) {
                                List<KafkaTopicPartition> parts = 
-                                               
FlinkKafkaConsumerBase.assignPartitions(inPartitions, inPartitions.size(), i);
+                                               
FlinkKafkaConsumerBase.assignPartitions(null, inPartitions, 
inPartitions.size(), i);
 
                                assertNotNull(parts);
                                assertEquals(1, parts.size());
@@ -88,7 +90,7 @@ public class KafkaConsumerPartitionAssignmentTest {
 
                        for (int i = 0; i < numConsumers; i++) {
                                List<KafkaTopicPartition> parts = 
-                                               
FlinkKafkaConsumerBase.assignPartitions(partitions, numConsumers, i);
+                                               
FlinkKafkaConsumerBase.assignPartitions(null, partitions, numConsumers, i);
 
                                assertNotNull(parts);
                                assertTrue(parts.size() >= 
minPartitionsPerConsumer);
@@ -124,7 +126,7 @@ public class KafkaConsumerPartitionAssignmentTest {
                        final int numConsumers = 2 * inPartitions.size() + 3;
 
                        for (int i = 0; i < numConsumers; i++) {
-                               List<KafkaTopicPartition> parts = 
FlinkKafkaConsumerBase.assignPartitions(inPartitions, numConsumers, i);
+                               List<KafkaTopicPartition> parts = 
FlinkKafkaConsumerBase.assignPartitions(null, inPartitions, numConsumers, i);
 
                                assertNotNull(parts);
                                assertTrue(parts.size() <= 1);
@@ -148,11 +150,11 @@ public class KafkaConsumerPartitionAssignmentTest {
        public void testAssignEmptyPartitions() {
                try {
                        List<KafkaTopicPartition> ep = new ArrayList<>();
-                       List<KafkaTopicPartition> parts1 = 
FlinkKafkaConsumerBase.assignPartitions(ep, 4, 2);
+                       List<KafkaTopicPartition> parts1 = 
FlinkKafkaConsumerBase.assignPartitions(null, ep, 4, 2);
                        assertNotNull(parts1);
                        assertTrue(parts1.isEmpty());
 
-                       List<KafkaTopicPartition> parts2 = 
FlinkKafkaConsumerBase.assignPartitions(ep, 1, 0);
+                       List<KafkaTopicPartition> parts2 = 
FlinkKafkaConsumerBase.assignPartitions(null, ep, 1, 0);
                        assertNotNull(parts2);
                        assertTrue(parts2.isEmpty());
                }
@@ -163,6 +165,33 @@ public class KafkaConsumerPartitionAssignmentTest {
        }
 
        @Test
+       public void testAssignOnRestoreOffsets() {
+               Map<KafkaTopicPartition, Long> restoredOffsets = new 
HashMap<>();
+               restoredOffsets.put(new KafkaTopicPartition("test-topic", 0), 
23L);
+               restoredOffsets.put(new KafkaTopicPartition("test-topic", 2), 
42L);
+
+               List<KafkaTopicPartition> completePartitionsList = 
Arrays.asList(
+                       new KafkaTopicPartition("test-topic", 0),
+                       new KafkaTopicPartition("test-topic", 1),
+                       new KafkaTopicPartition("test-topic", 2),
+                       new KafkaTopicPartition("test-topic", 3),
+                       new KafkaTopicPartition("test-topic", 4));
+
+               List<KafkaTopicPartition> expectedAssignedPartitions = new 
ArrayList<>(2);
+               expectedAssignedPartitions.add(new 
KafkaTopicPartition("test-topic", 0));
+               expectedAssignedPartitions.add(new 
KafkaTopicPartition("test-topic", 2));
+
+               List<KafkaTopicPartition> assignedPartitions = 
FlinkKafkaConsumerBase.assignPartitions(restoredOffsets, 
completePartitionsList, 1, 0);
+
+               // regardless of the complete partitions list, number of 
consumers, and consumer index, we should be
+               // assigned all partitions in the restored offsets
+               assertEquals(expectedAssignedPartitions.size(), 
assignedPartitions.size());
+               for (KafkaTopicPartition expectedPartition : 
expectedAssignedPartitions) {
+                       assignedPartitions.contains(expectedPartition);
+               }
+       }
+
+       @Test
        public void testGrowingPartitionsRemainsStable() {
                try {
                        final int[] newPartitionIDs = {4, 52, 17, 1, 2, 3, 89, 
42, 31, 127, 14};
@@ -185,11 +214,11 @@ public class KafkaConsumerPartitionAssignmentTest {
                        final int maxNewPartitionsPerConsumer = 
newPartitions.size() / numConsumers + 1;
 
                        List<KafkaTopicPartition> parts1 = 
FlinkKafkaConsumerBase.assignPartitions(
-                                       initialPartitions, numConsumers, 0);
+                                       null, initialPartitions, numConsumers, 
0);
                        List<KafkaTopicPartition> parts2 = 
FlinkKafkaConsumerBase.assignPartitions(
-                                       initialPartitions, numConsumers, 1);
+                                       null, initialPartitions, numConsumers, 
1);
                        List<KafkaTopicPartition> parts3 = 
FlinkKafkaConsumerBase.assignPartitions(
-                                       initialPartitions, numConsumers, 2);
+                                       null, initialPartitions, numConsumers, 
2);
 
                        assertNotNull(parts1);
                        assertNotNull(parts2);
@@ -221,11 +250,11 @@ public class KafkaConsumerPartitionAssignmentTest {
                        // grow the set of partitions and distribute anew
 
                        List<KafkaTopicPartition> parts1new = 
FlinkKafkaConsumerBase.assignPartitions(
-                                       newPartitions, numConsumers, 0);
+                                       null, newPartitions, numConsumers, 0);
                        List<KafkaTopicPartition> parts2new = 
FlinkKafkaConsumerBase.assignPartitions(
-                                       newPartitions, numConsumers, 1);
+                                       null, newPartitions, numConsumers, 1);
                        List<KafkaTopicPartition> parts3new = 
FlinkKafkaConsumerBase.assignPartitions(
-                                       newPartitions, numConsumers, 2);
+                                       null, newPartitions, numConsumers, 2);
 
                        // new partitions must include all old partitions
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e296acae/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index 9e3c33b..d24579c 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -233,7 +233,6 @@ public class KafkaShortRetentionTestBase implements 
Serializable {
                try {
                        env.execute("Test auto offset reset none");
                } catch(Throwable e) {
-                       System.out.println("MESSAGE: " + 
e.getCause().getCause().getMessage());
                        // check if correct exception has been thrown
                        
if(!e.getCause().getCause().getMessage().contains("Unable to find previous 
offset")  // kafka 0.8
                         && 
!e.getCause().getCause().getMessage().contains("Undefined offset with no reset 
policy for partition") // kafka 0.9

Reply via email to