This is an automated email from the ASF dual-hosted git repository. gwenshap pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new bb8de0b KAFKA-8003; Fix flaky testFencingOnTransactionExpiration bb8de0b is described below commit bb8de0b8c5f98f7a9d6b5ae7342ba7a0e1af8868 Author: Jason Gustafson <ja...@confluent.io> AuthorDate: Fri Jun 7 12:51:51 2019 -0700 KAFKA-8003; Fix flaky testFencingOnTransactionExpiration We see this failure from time to time: ``` java.lang.AssertionError: expected:<1> but was:<0> at org.junit.Assert.fail(Assert.java:89) at org.junit.Assert.failNotEquals(Assert.java:835) at org.junit.Assert.assertEquals(Assert.java:647) at org.junit.Assert.assertEquals(Assert.java:633) at kafka.api.TransactionsTest.testFencingOnTransactionExpiration(TransactionsTest.scala:512) ``` The cause is probably that we are using `consumeRecordsFor` which has no expectation on the number of records to fetch and a timeout of just 1s. This patch changes the code to use `consumeRecords` and the default 15s timeout. Note we have also fixed a bug in the test case itself, which was using the wrong topic for the second write, which meant it could never have failed in the anticipated way anyway. Author: Jason Gustafson <ja...@confluent.io> Reviewers: Gwen Shapira Closes #6905 from hachikuji/fix-flaky-transaction-test --- .../test/scala/integration/kafka/api/TransactionsTest.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala index 375adaa..13ddd92 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala @@ -497,7 +497,7 @@ class TransactionsTest extends KafkaServerTestHarness { try { // Now that the transaction has expired, the second send should fail with a ProducerFencedException. - producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "2", willBeCommitted = false)).get() + producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "2", "2", willBeCommitted = false)).get() fail("should have raised a ProducerFencedException since the transaction has expired") } catch { case _: ProducerFencedException => @@ -506,9 +506,13 @@ class TransactionsTest extends KafkaServerTestHarness { } // Verify that the first message was aborted and the second one was never written at all. - val nonTransactionalConsumer = nonTransactionalConsumers(0) + val nonTransactionalConsumer = nonTransactionalConsumers.head nonTransactionalConsumer.subscribe(List(topic1).asJava) - val records = TestUtils.consumeRecordsFor(nonTransactionalConsumer, 1000) + + // Attempt to consume the one written record. We should not see the second. The + // assertion does not strictly guarantee that the record wasn't written, but the + // data is small enough that had it been written, it would have been in the first fetch. + val records = TestUtils.consumeRecords(nonTransactionalConsumer, numRecords = 1) assertEquals(1, records.size) assertEquals("1", TestUtils.recordValueAsString(records.head))