[FLINK-6988][kafka] Add test for failure before before checkpoint and scaling down
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4ada50b3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4ada50b3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4ada50b3 Branch: refs/heads/master Commit: 4ada50b3dd7c4af9735585a8c45eda4de10bb6e5 Parents: 867c012 Author: Piotr Nowojski <[email protected]> Authored: Thu Aug 24 14:16:55 2017 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Mon Oct 9 18:58:36 2017 +0200 ---------------------------------------------------------------------- .../kafka/FlinkKafkaProducer011Tests.java | 114 ++++++++++++++++--- .../util/OneInputStreamOperatorTestHarness.java | 11 ++ 2 files changed, 108 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4ada50b3/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java index 51410da..dd21bf4 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011Tests.java @@ -37,6 +37,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.errors.ProducerFencedException; import org.junit.Before; import org.junit.Test; @@ -258,26 +259,61 @@ public class FlinkKafkaProducer011Tests extends KafkaTestBase { deleteTestTopic(topic); } - private OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness(String topic) throws Exception { - Properties properties = createProperties(); + /** + * This tests checks whether FlinkKafkaProducer011 correctly aborts lingering transactions after a failure, + * which happened before first checkpoint and was followed up by reducing the parallelism. + * If such transactions were left alone lingering it consumers would be unable to read committed records + * that were created after this lingering transaction. + */ + @Test(timeout = 120_000L) + public void testScaleDownBeforeFirstCheckpoint() throws Exception { + String topic = "scale-down-before-first-checkpoint"; + + List<AutoCloseable> operatorsToClose = new ArrayList<>(); + int preScaleDownParallelism = Math.max(2, FlinkKafkaProducer011.SAFE_SCALE_DOWN_FACTOR); + for (int subtaskIndex = 0; subtaskIndex < preScaleDownParallelism; subtaskIndex++) { + OneInputStreamOperatorTestHarness<Integer, Object> preScaleDownOperator = createTestHarness( + topic, + preScaleDownParallelism, + preScaleDownParallelism, + subtaskIndex); + + preScaleDownOperator.setup(); + preScaleDownOperator.open(); + preScaleDownOperator.processElement(subtaskIndex * 2, 0); + preScaleDownOperator.snapshot(0, 1); + preScaleDownOperator.processElement(subtaskIndex * 2 + 1, 2); + + operatorsToClose.add(preScaleDownOperator); + } - FlinkKafkaProducer011<Integer> kafkaProducer = new FlinkKafkaProducer011<>( - topic, - integerKeyedSerializationSchema, - properties, - FlinkKafkaProducer011.Semantic.EXACTLY_ONCE); + // do not close previous testHarnesses to make sure that closing do not clean up something (in case of failure + // there might not be any close) - return new OneInputStreamOperatorTestHarness<>( - new StreamSink<>(kafkaProducer), - IntSerializer.INSTANCE); - } + // After previous failure simulate restarting application with smaller parallelism + OneInputStreamOperatorTestHarness<Integer, Object> postScaleDownOperator1 = createTestHarness(topic, 1, 1, 0); - private Properties createProperties() { - Properties properties = new Properties(); - properties.putAll(standardProps); - properties.putAll(secureProps); - properties.put(FlinkKafkaProducer011.KEY_DISABLE_METRICS, "true"); - return properties; + postScaleDownOperator1.setup(); + postScaleDownOperator1.open(); + + // write and commit more records, after potentially lingering transactions + postScaleDownOperator1.processElement(46, 7); + postScaleDownOperator1.snapshot(4, 8); + postScaleDownOperator1.processElement(47, 9); + postScaleDownOperator1.notifyOfCompletedCheckpoint(4); + + //now we should have: + // - records 42, 43, 44 and 45 in aborted transactions + // - committed transaction with record 46 + // - pending transaction with record 47 + assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(46), 30_000L); + + postScaleDownOperator1.close(); + // ignore ProducerFencedExceptions, because postScaleDownOperator1 could reuse transactional ids. + for (AutoCloseable operatorToClose : operatorsToClose) { + closeIgnoringProducerFenced(operatorToClose); + } + deleteTestTopic(topic); } @Test @@ -363,4 +399,48 @@ public class FlinkKafkaProducer011Tests extends KafkaTestBase { assertEquals(expectedValue, record.value()); } } + + private void closeIgnoringProducerFenced(AutoCloseable autoCloseable) throws Exception { + try { + autoCloseable.close(); + } + catch (Exception ex) { + if (!(ex.getCause() instanceof ProducerFencedException)) { + throw ex; + } + } + } + + private OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness(String topic) throws Exception { + return createTestHarness(topic, 1, 1, 0); + } + + private OneInputStreamOperatorTestHarness<Integer, Object> createTestHarness( + String topic, + int maxParallelism, + int parallelism, + int subtaskIndex) throws Exception { + Properties properties = createProperties(); + + FlinkKafkaProducer011<Integer> kafkaProducer = new FlinkKafkaProducer011<>( + topic, + integerKeyedSerializationSchema, + properties, + FlinkKafkaProducer011.Semantic.EXACTLY_ONCE); + + return new OneInputStreamOperatorTestHarness<>( + new StreamSink<>(kafkaProducer), + maxParallelism, + parallelism, + subtaskIndex, + IntSerializer.INSTANCE); + } + + private Properties createProperties() { + Properties properties = new Properties(); + properties.putAll(standardProps); + properties.putAll(secureProps); + properties.put(FlinkKafkaProducer011.KEY_DISABLE_METRICS, "true"); + return properties; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/4ada50b3/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java index c8fa2a4..5c7d986 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java @@ -51,6 +51,17 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> public OneInputStreamOperatorTestHarness( OneInputStreamOperator<IN, OUT> operator, + int maxParallelism, + int parallelism, + int subtaskIndex, + TypeSerializer<IN> typeSerializerIn) throws Exception { + this(operator, maxParallelism, parallelism, subtaskIndex); + + config.setTypeSerializerIn1(Preconditions.checkNotNull(typeSerializerIn)); + } + + public OneInputStreamOperatorTestHarness( + OneInputStreamOperator<IN, OUT> operator, TypeSerializer<IN> typeSerializerIn, Environment environment) throws Exception { this(operator, environment);
