Repository: flink
Updated Branches:
  refs/heads/master b0a7a1b81 -> 1a34f2165


[hotfix] [kafka consumer] Increase Kafka test stability


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2728f924
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2728f924
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2728f924

Branch: refs/heads/master
Commit: 2728f924cf64cd62929b8f0e394a1d4335af8156
Parents: b0a7a1b
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Apr 13 10:38:37 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Apr 13 20:50:48 2016 +0200

----------------------------------------------------------------------
 .../connectors/kafka/KafkaConsumerTestBase.java        | 13 ++++++++++++-
 1 file changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2728f924/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 8ff67b4..a65a411 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -1278,13 +1278,24 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                                                        new 
Tuple2Partitioner(parallelism)))
                                        .setParallelism(parallelism);
 
-                       writeEnv.execute("Write sequence");
+                       try {
+                               writeEnv.execute("Write sequence");
+                       }
+                       catch (Exception e) {
+                               LOG.error("Write attempt failed, trying again", 
e);
+                               deleteTestTopic(topicName);
+                               
JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
+                               continue;
+                       }
+                       
                        LOG.info("Finished writing sequence");
 
                        // -------- Validate the Sequence --------
                        
                        // we need to validate the sequence, because kafka's 
producers are not exactly once
                        LOG.info("Validating sequence");
+
+                       
JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
                        
                        final StreamExecutionEnvironment readEnv = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
                        
readEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());

Reply via email to