Repository: kafka
Updated Branches:
  refs/heads/trunk 066bfc314 -> 22f742cdd


MINOR: Stabilize flaky smoke system tests before KIP-91

This is a workaround until KIP-91 is merged. We tried increasing the timeout 
multiple times already but tests are still flaky.

Author: Matthias J. Sax <matth...@confluent.io>

Reviewers: Bill Bejeck <b...@confluent.io>, Apurva Mehta <apu...@confluent.io>, 
Guozhang Wang <wangg...@gmail.com>

Closes #4329 from mjsax/hotfix-system-tests


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/22f742cd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/22f742cd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/22f742cd

Branch: refs/heads/trunk
Commit: 22f742cdd2899b76c1b4222863ee02ad3bc749a1
Parents: 066bfc3
Author: Matthias J. Sax <matth...@confluent.io>
Authored: Mon Dec 18 17:34:50 2017 -0800
Committer: Guozhang Wang <wangg...@gmail.com>
Committed: Mon Dec 18 17:34:50 2017 -0800

----------------------------------------------------------------------
 .../kafka/streams/tests/SmokeTestDriver.java    | 53 ++++++++++++++++----
 1 file changed, 43 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/22f742cd/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java 
b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index 81dd66b..882e9c0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.utils.Exit;
@@ -155,6 +156,8 @@ public class SmokeTestDriver extends SmokeTestUtil {
 
         int remaining = data.length;
 
+        List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>();
+
         while (remaining > 0) {
             int index = rand.nextInt(remaining);
             String key = data[index].key;
@@ -168,29 +171,59 @@ public class SmokeTestDriver extends SmokeTestUtil {
                 ProducerRecord<byte[], byte[]> record =
                         new ProducerRecord<>("data", 
stringSerde.serializer().serialize("", key), 
intSerde.serializer().serialize("", value));
 
-                producer.send(record, new Callback() {
-                    @Override
-                    public void onCompletion(final RecordMetadata metadata, 
final Exception exception) {
-                        if (exception != null) {
-                            exception.printStackTrace();
-                            Exit.exit(1);
-                        }
-                    }
-                });
-
+                producer.send(record, new TestCallback(record, needRetry));
 
                 numRecordsProduced++;
                 allData.get(key).add(value);
                 if (numRecordsProduced % 100 == 0)
                     System.out.println(numRecordsProduced + " records 
produced");
                 Utils.sleep(2);
+            }
+        }
+        producer.flush();
 
+        int remainingRetries = 5;
+        while (!needRetry.isEmpty()) {
+            final List<ProducerRecord<byte[], byte[]>> needRetry2 = new 
ArrayList<>();
+            for (final ProducerRecord<byte[], byte[]> record : needRetry) {
+                producer.send(record, new TestCallback(record, needRetry2));
+            }
+            producer.flush();
+            needRetry = needRetry2;
+
+            if (--remainingRetries == 0 && !needRetry.isEmpty()) {
+                System.err.println("Failed to produce all records after 
multiple retries");
+                Exit.exit(1);
             }
         }
+
         producer.close();
         return Collections.unmodifiableMap(allData);
     }
 
+    private static class TestCallback implements Callback {
+        private final ProducerRecord<byte[], byte[]> originalRecord;
+        private final List<ProducerRecord<byte[], byte[]>> needRetry;
+
+        TestCallback(final ProducerRecord<byte[], byte[]> originalRecord,
+                     final List<ProducerRecord<byte[], byte[]>> needRetry) {
+            this.originalRecord = originalRecord;
+            this.needRetry = needRetry;
+        }
+
+        @Override
+        public void onCompletion(final RecordMetadata metadata, final 
Exception exception) {
+            if (exception != null) {
+                if (exception instanceof TimeoutException) {
+                    needRetry.add(originalRecord);
+                } else {
+                    exception.printStackTrace();
+                    Exit.exit(1);
+                }
+            }
+        }
+    }
+
     private static void shuffle(int[] data, int windowSize) {
         Random rand = new Random();
         for (int i = 0; i < data.length; i++) {

Reply via email to