Repository: kafka Updated Branches: refs/heads/trunk 066bfc314 -> 22f742cdd
MINOR: Stabilize flaky smoke system tests before KIP-91 This is a workaround until KIP-91 is merged. We tried increasing the timeout multiple times already but tests are still flaky. Author: Matthias J. Sax <matth...@confluent.io> Reviewers: Bill Bejeck <b...@confluent.io>, Apurva Mehta <apu...@confluent.io>, Guozhang Wang <wangg...@gmail.com> Closes #4329 from mjsax/hotfix-system-tests Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/22f742cd Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/22f742cd Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/22f742cd Branch: refs/heads/trunk Commit: 22f742cdd2899b76c1b4222863ee02ad3bc749a1 Parents: 066bfc3 Author: Matthias J. Sax <matth...@confluent.io> Authored: Mon Dec 18 17:34:50 2017 -0800 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Mon Dec 18 17:34:50 2017 -0800 ---------------------------------------------------------------------- .../kafka/streams/tests/SmokeTestDriver.java | 53 ++++++++++++++++---- 1 file changed, 43 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/22f742cd/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index 81dd66b..882e9c0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -27,6 +27,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.utils.Exit; @@ -155,6 +156,8 @@ public class SmokeTestDriver extends SmokeTestUtil { int remaining = data.length; + List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>(); + while (remaining > 0) { int index = rand.nextInt(remaining); String key = data[index].key; @@ -168,29 +171,59 @@ public class SmokeTestDriver extends SmokeTestUtil { ProducerRecord<byte[], byte[]> record = new ProducerRecord<>("data", stringSerde.serializer().serialize("", key), intSerde.serializer().serialize("", value)); - producer.send(record, new Callback() { - @Override - public void onCompletion(final RecordMetadata metadata, final Exception exception) { - if (exception != null) { - exception.printStackTrace(); - Exit.exit(1); - } - } - }); - + producer.send(record, new TestCallback(record, needRetry)); numRecordsProduced++; allData.get(key).add(value); if (numRecordsProduced % 100 == 0) System.out.println(numRecordsProduced + " records produced"); Utils.sleep(2); + } + } + producer.flush(); + int remainingRetries = 5; + while (!needRetry.isEmpty()) { + final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>(); + for (final ProducerRecord<byte[], byte[]> record : needRetry) { + producer.send(record, new TestCallback(record, needRetry2)); + } + producer.flush(); + needRetry = needRetry2; + + if (--remainingRetries == 0 && !needRetry.isEmpty()) { + System.err.println("Failed to produce all records after multiple retries"); + Exit.exit(1); } } + producer.close(); return Collections.unmodifiableMap(allData); } + private static class TestCallback implements Callback { + private final ProducerRecord<byte[], byte[]> originalRecord; + private final List<ProducerRecord<byte[], byte[]>> needRetry; + + TestCallback(final ProducerRecord<byte[], byte[]> originalRecord, + final List<ProducerRecord<byte[], byte[]>> needRetry) { + this.originalRecord = originalRecord; + this.needRetry = needRetry; + } + + @Override + public void onCompletion(final RecordMetadata metadata, final Exception exception) { + if (exception != null) { + if (exception instanceof TimeoutException) { + needRetry.add(originalRecord); + } else { + exception.printStackTrace(); + Exit.exit(1); + } + } + } + } + private static void shuffle(int[] data, int windowSize) { Random rand = new Random(); for (int i = 0; i < data.length; i++) {