This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 48d60efe980 KAFKA-17990: Deflake testShareAutoOffsetResetDefaultValue
(#17916)
48d60efe980 is described below
commit 48d60efe9802786451458aec9136b7b585fecd92
Author: Andrew Schofield <[email protected]>
AuthorDate: Tue Nov 26 04:48:29 2024 +0000
KAFKA-17990: Deflake testShareAutoOffsetResetDefaultValue (#17916)
ShareConsumerTest.testShareAutoOffsetResetDefaultValue has been tightened
up by making sure that records produced have been flushed before starting
consumption. A possible but unlikely race condition seems the source of the
flakiness and this should now be eliminated in the previous PR to this test
case.
Reviewers: Manikumar Reddy <[email protected]>
---
.../java/kafka/test/api/ShareConsumerTest.java | 47 +++++++++++-----------
1 file changed, 23 insertions(+), 24 deletions(-)
diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java
b/core/src/test/java/kafka/test/api/ShareConsumerTest.java
index fca9cffca86..c3e97d16d24 100644
--- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java
+++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java
@@ -293,7 +293,7 @@ public class ShareConsumerTest {
ProducerRecord<byte[], byte[]> record2 = new
ProducerRecord<>(tp2.topic(), tp2.partition(), null, "key".getBytes(),
"value".getBytes());
producer.send(record2).get();
producer.flush();
- shareConsumer.setAcknowledgementCommitCallback(new
TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap));
+ shareConsumer.setAcknowledgementCommitCallback(new
TestableAcknowledgementCommitCallback(partitionOffsetsMap,
partitionExceptionMap));
shareConsumer.subscribe(Collections.singleton(tp.topic()));
@@ -328,7 +328,7 @@ public class ShareConsumerTest {
producer.send(record);
producer.flush();
- shareConsumer.setAcknowledgementCommitCallback(new
TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap));
+ shareConsumer.setAcknowledgementCommitCallback(new
TestableAcknowledgementCommitCallback(partitionOffsetsMap,
partitionExceptionMap));
shareConsumer.subscribe(Collections.singleton(tp.topic()));
TestUtils.waitForCondition(() ->
shareConsumer.poll(Duration.ofMillis(2000)).count() == 1,
@@ -356,7 +356,7 @@ public class ShareConsumerTest {
ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
producer.send(record);
producer.flush();
- shareConsumer.setAcknowledgementCommitCallback(new
TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap));
+ shareConsumer.setAcknowledgementCommitCallback(new
TestableAcknowledgementCommitCallback(partitionOffsetsMap,
partitionExceptionMap));
shareConsumer.subscribe(Collections.singleton(tp.topic()));
ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(5000));
@@ -388,7 +388,7 @@ public class ShareConsumerTest {
producer.send(record);
producer.flush();
- shareConsumer.setAcknowledgementCommitCallback(new
TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap));
+ shareConsumer.setAcknowledgementCommitCallback(new
TestableAcknowledgementCommitCallback(partitionOffsetsMap,
partitionExceptionMap));
shareConsumer.subscribe(Collections.singleton(tp.topic()));
ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(5000));
@@ -404,12 +404,12 @@ public class ShareConsumerTest {
}
}
- private static class TestableAcknowledgeCommitCallback implements
AcknowledgementCommitCallback {
+ private static class TestableAcknowledgementCommitCallback implements
AcknowledgementCommitCallback {
private final Map<TopicPartition, Set<Long>> partitionOffsetsMap;
private final Map<TopicPartition, Exception> partitionExceptionMap;
- public TestableAcknowledgeCommitCallback(Map<TopicPartition,
Set<Long>> partitionOffsetsMap,
- Map<TopicPartition,
Exception> partitionExceptionMap) {
+ public TestableAcknowledgementCommitCallback(Map<TopicPartition,
Set<Long>> partitionOffsetsMap,
+ Map<TopicPartition,
Exception> partitionExceptionMap) {
this.partitionOffsetsMap = partitionOffsetsMap;
this.partitionExceptionMap = partitionExceptionMap;
}
@@ -623,7 +623,7 @@ public class ShareConsumerTest {
Map<TopicPartition, Set<Long>> partitionOffsetsMap1 = new
HashMap<>();
Map<TopicPartition, Exception> partitionExceptionMap1 = new
HashMap<>();
- shareConsumer1.setAcknowledgementCommitCallback(new
TestableAcknowledgeCommitCallback(partitionOffsetsMap1,
partitionExceptionMap1));
+ shareConsumer1.setAcknowledgementCommitCallback(new
TestableAcknowledgementCommitCallback(partitionOffsetsMap1,
partitionExceptionMap1));
ConsumerRecords<byte[], byte[]> records =
shareConsumer1.poll(Duration.ofMillis(5000));
assertEquals(3, records.count());
@@ -678,7 +678,7 @@ public class ShareConsumerTest {
Map<TopicPartition, Set<Long>> partitionOffsetsMap = new
HashMap<>();
Map<TopicPartition, Exception> partitionExceptionMap = new
HashMap<>();
- shareConsumer1.setAcknowledgementCommitCallback(new
TestableAcknowledgeCommitCallback(partitionOffsetsMap, partitionExceptionMap));
+ shareConsumer1.setAcknowledgementCommitCallback(new
TestableAcknowledgementCommitCallback(partitionOffsetsMap,
partitionExceptionMap));
ConsumerRecords<byte[], byte[]> records =
shareConsumer1.poll(Duration.ofMillis(5000));
assertEquals(3, records.count());
@@ -870,7 +870,7 @@ public class ShareConsumerTest {
Map<TopicPartition, Set<Long>> partitionOffsetsMap1 = new
HashMap<>();
Map<TopicPartition, Exception> partitionExceptionMap1 = new
HashMap<>();
- shareConsumer.setAcknowledgementCommitCallback(new
TestableAcknowledgeCommitCallback(partitionOffsetsMap1,
partitionExceptionMap1));
+ shareConsumer.setAcknowledgementCommitCallback(new
TestableAcknowledgementCommitCallback(partitionOffsetsMap1,
partitionExceptionMap1));
ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(3, records.count());
@@ -1328,11 +1328,11 @@ public class ShareConsumerTest {
/**
* Test to verify that the acknowledgement commit callback cannot invoke
methods of KafkaShareConsumer.
- * The exception thrown is verified in {@link
TestableAcknowledgeCommitCallbackWithShareConsumer}
+ * The exception thrown is verified in {@link
TestableAcknowledgementCommitCallbackWithShareConsumer}
*/
@ParameterizedTest(name = "{displayName}.persister={0}")
@ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
- public void
testAcknowledgeCommitCallbackCallsShareConsumerDisallowed(String persister) {
+ public void
testAcknowledgementCommitCallbackCallsShareConsumerDisallowed(String persister)
{
alterShareAutoOffsetReset("group1", "earliest");
try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
@@ -1341,7 +1341,7 @@ public class ShareConsumerTest {
producer.send(record);
producer.flush();
- shareConsumer.setAcknowledgementCommitCallback(new
TestableAcknowledgeCommitCallbackWithShareConsumer<>(shareConsumer));
+ shareConsumer.setAcknowledgementCommitCallback(new
TestableAcknowledgementCommitCallbackWithShareConsumer<>(shareConsumer));
shareConsumer.subscribe(Collections.singleton(tp.topic()));
// The acknowledgment commit callback will try to call a method of
KafkaShareConsumer
@@ -1353,10 +1353,10 @@ public class ShareConsumerTest {
}
}
- private class TestableAcknowledgeCommitCallbackWithShareConsumer<K, V>
implements AcknowledgementCommitCallback {
+ private class TestableAcknowledgementCommitCallbackWithShareConsumer<K, V>
implements AcknowledgementCommitCallback {
private final KafkaShareConsumer<K, V> shareConsumer;
-
TestableAcknowledgeCommitCallbackWithShareConsumer(KafkaShareConsumer<K, V>
shareConsumer) {
+
TestableAcknowledgementCommitCallbackWithShareConsumer(KafkaShareConsumer<K, V>
shareConsumer) {
this.shareConsumer = shareConsumer;
}
@@ -1376,7 +1376,7 @@ public class ShareConsumerTest {
@Flaky("KAFKA-18033")
@ParameterizedTest(name = "{displayName}.persister={0}")
@ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
- public void testAcknowledgeCommitCallbackCallsShareConsumerWakeup(String
persister) throws InterruptedException {
+ public void
testAcknowledgementCommitCallbackCallsShareConsumerWakeup(String persister)
throws InterruptedException {
alterShareAutoOffsetReset("group1", "earliest");
try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
@@ -1386,7 +1386,7 @@ public class ShareConsumerTest {
producer.flush();
// The acknowledgment commit callback will try to call a method of
KafkaShareConsumer
- shareConsumer.setAcknowledgementCommitCallback(new
TestableAcknowledgeCommitCallbackWakeup<>(shareConsumer));
+ shareConsumer.setAcknowledgementCommitCallback(new
TestableAcknowledgementCommitCallbackWakeup<>(shareConsumer));
shareConsumer.subscribe(Collections.singleton(tp.topic()));
TestUtils.waitForCondition(() ->
shareConsumer.poll(Duration.ofMillis(2000)).count() == 1,
@@ -1409,10 +1409,10 @@ public class ShareConsumerTest {
}
}
- private static class TestableAcknowledgeCommitCallbackWakeup<K, V>
implements AcknowledgementCommitCallback {
+ private static class TestableAcknowledgementCommitCallbackWakeup<K, V>
implements AcknowledgementCommitCallback {
private final KafkaShareConsumer<K, V> shareConsumer;
- TestableAcknowledgeCommitCallbackWakeup(KafkaShareConsumer<K, V>
shareConsumer) {
+ TestableAcknowledgementCommitCallbackWakeup(KafkaShareConsumer<K, V>
shareConsumer) {
this.shareConsumer = shareConsumer;
}
@@ -1429,7 +1429,7 @@ public class ShareConsumerTest {
@Flaky("KAFKA-18033")
@ParameterizedTest(name = "{displayName}.persister={0}")
@ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
- public void testAcknowledgeCommitCallbackThrowsException(String persister)
throws InterruptedException {
+ public void testAcknowledgementCommitCallbackThrowsException(String
persister) throws InterruptedException {
alterShareAutoOffsetReset("group1", "earliest");
try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
@@ -1438,7 +1438,7 @@ public class ShareConsumerTest {
producer.send(record);
producer.flush();
- shareConsumer.setAcknowledgementCommitCallback(new
TestableAcknowledgeCommitCallbackThrows<>());
+ shareConsumer.setAcknowledgementCommitCallback(new
TestableAcknowledgementCommitCallbackThrows<>());
shareConsumer.subscribe(Collections.singleton(tp.topic()));
TestUtils.waitForCondition(() ->
shareConsumer.poll(Duration.ofMillis(2000)).count() == 1,
@@ -1456,10 +1456,10 @@ public class ShareConsumerTest {
}
}
- private static class TestableAcknowledgeCommitCallbackThrows<K, V>
implements AcknowledgementCommitCallback {
+ private static class TestableAcknowledgementCommitCallbackThrows<K, V>
implements AcknowledgementCommitCallback {
@Override
public void onComplete(Map<TopicIdPartition, Set<Long>> offsetsMap,
Exception exception) {
- throw new
org.apache.kafka.common.errors.OutOfOrderSequenceException("Exception thrown in
TestableAcknowledgeCommitCallbackThrows.onComplete");
+ throw new
org.apache.kafka.common.errors.OutOfOrderSequenceException("Exception thrown in
TestableAcknowledgementCommitCallbackThrows.onComplete");
}
}
@@ -1673,7 +1673,6 @@ public class ShareConsumerTest {
}
}
- @Flaky("KAFKA-18033")
@ParameterizedTest(name = "{displayName}.persister={0}")
@ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
public void testShareAutoOffsetResetDefaultValue(String persister) {