[FLINK-6988][kafka] Add test for failure before before checkpoint and scaling 
down


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4ada50b3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4ada50b3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4ada50b3

Branch: refs/heads/master
Commit: 4ada50b3dd7c4af9735585a8c45eda4de10bb6e5
Parents: 867c012
Author: Piotr Nowojski <[email protected]>
Authored: Thu Aug 24 14:16:55 2017 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Mon Oct 9 18:58:36 2017 +0200

----------------------------------------------------------------------
 .../kafka/FlinkKafkaProducer011Tests.java       | 114 ++++++++++++++++---
 .../util/OneInputStreamOperatorTestHarness.java |  11 ++
 2 files changed, 108 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4ada50b3/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
index 51410da..dd21bf4 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java
@@ -37,6 +37,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.errors.ProducerFencedException;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -258,26 +259,61 @@ public class FlinkKafkaProducer011Tests extends 
KafkaTestBase {
                deleteTestTopic(topic);
        }
 
-       private OneInputStreamOperatorTestHarness<Integer, Object> 
createTestHarness(String topic) throws Exception {
-               Properties properties = createProperties();
+       /**
+        * This tests checks whether FlinkKafkaProducer011 correctly aborts 
lingering transactions after a failure,
+        * which happened before first checkpoint and was followed up by 
reducing the parallelism.
+        * If such transactions were left alone lingering it consumers would be 
unable to read committed records
+        * that were created after this lingering transaction.
+        */
+       @Test(timeout = 120_000L)
+       public void testScaleDownBeforeFirstCheckpoint() throws Exception {
+               String topic = "scale-down-before-first-checkpoint";
+
+               List<AutoCloseable> operatorsToClose = new ArrayList<>();
+               int preScaleDownParallelism = Math.max(2, 
FlinkKafkaProducer011.SAFE_SCALE_DOWN_FACTOR);
+               for (int subtaskIndex = 0; subtaskIndex < 
preScaleDownParallelism; subtaskIndex++) {
+                       OneInputStreamOperatorTestHarness<Integer, Object> 
preScaleDownOperator = createTestHarness(
+                               topic,
+                               preScaleDownParallelism,
+                               preScaleDownParallelism,
+                               subtaskIndex);
+
+                       preScaleDownOperator.setup();
+                       preScaleDownOperator.open();
+                       preScaleDownOperator.processElement(subtaskIndex * 2, 
0);
+                       preScaleDownOperator.snapshot(0, 1);
+                       preScaleDownOperator.processElement(subtaskIndex * 2 + 
1, 2);
+
+                       operatorsToClose.add(preScaleDownOperator);
+               }
 
-               FlinkKafkaProducer011<Integer> kafkaProducer = new 
FlinkKafkaProducer011<>(
-                       topic,
-                       integerKeyedSerializationSchema,
-                       properties,
-                       FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
+               // do not close previous testHarnesses to make sure that 
closing do not clean up something (in case of failure
+               // there might not be any close)
 
-               return new OneInputStreamOperatorTestHarness<>(
-                       new StreamSink<>(kafkaProducer),
-                       IntSerializer.INSTANCE);
-       }
+               // After previous failure simulate restarting application with 
smaller parallelism
+               OneInputStreamOperatorTestHarness<Integer, Object> 
postScaleDownOperator1 = createTestHarness(topic, 1, 1, 0);
 
-       private Properties createProperties() {
-               Properties properties = new Properties();
-               properties.putAll(standardProps);
-               properties.putAll(secureProps);
-               properties.put(FlinkKafkaProducer011.KEY_DISABLE_METRICS, 
"true");
-               return properties;
+               postScaleDownOperator1.setup();
+               postScaleDownOperator1.open();
+
+               // write and commit more records, after potentially lingering 
transactions
+               postScaleDownOperator1.processElement(46, 7);
+               postScaleDownOperator1.snapshot(4, 8);
+               postScaleDownOperator1.processElement(47, 9);
+               postScaleDownOperator1.notifyOfCompletedCheckpoint(4);
+
+               //now we should have:
+               // - records 42, 43, 44 and 45 in aborted transactions
+               // - committed transaction with record 46
+               // - pending transaction with record 47
+               assertExactlyOnceForTopic(createProperties(), topic, 0, 
Arrays.asList(46), 30_000L);
+
+               postScaleDownOperator1.close();
+               // ignore ProducerFencedExceptions, because 
postScaleDownOperator1 could reuse transactional ids.
+               for (AutoCloseable operatorToClose : operatorsToClose) {
+                       closeIgnoringProducerFenced(operatorToClose);
+               }
+               deleteTestTopic(topic);
        }
 
        @Test
@@ -363,4 +399,48 @@ public class FlinkKafkaProducer011Tests extends 
KafkaTestBase {
                        assertEquals(expectedValue, record.value());
                }
        }
+
+       private void closeIgnoringProducerFenced(AutoCloseable autoCloseable) 
throws Exception {
+               try {
+                       autoCloseable.close();
+               }
+               catch (Exception ex) {
+                       if (!(ex.getCause() instanceof 
ProducerFencedException)) {
+                               throw ex;
+                       }
+               }
+       }
+
+       private OneInputStreamOperatorTestHarness<Integer, Object> 
createTestHarness(String topic) throws Exception {
+               return createTestHarness(topic, 1, 1, 0);
+       }
+
+       private OneInputStreamOperatorTestHarness<Integer, Object> 
createTestHarness(
+               String topic,
+               int maxParallelism,
+               int parallelism,
+               int subtaskIndex) throws Exception {
+               Properties properties = createProperties();
+
+               FlinkKafkaProducer011<Integer> kafkaProducer = new 
FlinkKafkaProducer011<>(
+                       topic,
+                       integerKeyedSerializationSchema,
+                       properties,
+                       FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
+
+               return new OneInputStreamOperatorTestHarness<>(
+                       new StreamSink<>(kafkaProducer),
+                       maxParallelism,
+                       parallelism,
+                       subtaskIndex,
+                       IntSerializer.INSTANCE);
+       }
+
+       private Properties createProperties() {
+               Properties properties = new Properties();
+               properties.putAll(standardProps);
+               properties.putAll(secureProps);
+               properties.put(FlinkKafkaProducer011.KEY_DISABLE_METRICS, 
"true");
+               return properties;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4ada50b3/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index c8fa2a4..5c7d986 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -51,6 +51,17 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
 
        public OneInputStreamOperatorTestHarness(
                OneInputStreamOperator<IN, OUT> operator,
+               int maxParallelism,
+               int parallelism,
+               int subtaskIndex,
+               TypeSerializer<IN> typeSerializerIn) throws Exception {
+               this(operator, maxParallelism, parallelism, subtaskIndex);
+
+               
config.setTypeSerializerIn1(Preconditions.checkNotNull(typeSerializerIn));
+       }
+
+       public OneInputStreamOperatorTestHarness(
+               OneInputStreamOperator<IN, OUT> operator,
                TypeSerializer<IN> typeSerializerIn,
                Environment environment) throws Exception {
                this(operator, environment);

Reply via email to