Repository: flink Updated Branches: refs/heads/master b0a7a1b81 -> 1a34f2165
[hotfix] [kafka consumer] Increase Kafka test stability Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2728f924 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2728f924 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2728f924 Branch: refs/heads/master Commit: 2728f924cf64cd62929b8f0e394a1d4335af8156 Parents: b0a7a1b Author: Stephan Ewen <se...@apache.org> Authored: Wed Apr 13 10:38:37 2016 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Apr 13 20:50:48 2016 +0200 ---------------------------------------------------------------------- .../connectors/kafka/KafkaConsumerTestBase.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2728f924/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index 8ff67b4..a65a411 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -1278,13 +1278,24 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { new Tuple2Partitioner(parallelism))) .setParallelism(parallelism); - writeEnv.execute("Write sequence"); + try { + writeEnv.execute("Write sequence"); + } + catch (Exception e) { + LOG.error("Write attempt failed, trying again", e); + deleteTestTopic(topicName); + JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout)); + continue; + } + LOG.info("Finished writing sequence"); // -------- Validate the Sequence -------- // we need to validate the sequence, because kafka's producers are not exactly once LOG.info("Validating sequence"); + + JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout)); final StreamExecutionEnvironment readEnv = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); readEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());