This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new 4b86ff9d00e KAFKA-18943: Update EosIntegrationTest for EOSv1 (#19312)
4b86ff9d00e is described below
commit 4b86ff9d00eaa1f93a99a60b6c321abbabbdb636
Author: Matthias J. Sax <[email protected]>
AuthorDate: Mon Mar 31 10:56:58 2025 -0700
KAFKA-18943: Update EosIntegrationTest for EOSv1 (#19312)
After cherry-picking
https://github.com/apache/kafka/commit/2181ddbb039ff688f5ff41784d943cb579f7575c,
we realized that the newly added test does not cover EOSv1. This PR closes
this testing gap.
Reviewers: Lucas Brutschy <[email protected]>
---
.../streams/integration/EosIntegrationTest.java | 34 +++++++++++++++++-----
1 file changed, 27 insertions(+), 7 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 584fcb4a25a..c4fc8c434e8 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -73,7 +73,6 @@ import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
@@ -1001,12 +1000,25 @@ public class EosIntegrationTest {
}
- private final AtomicReference<String> transactionalProducerId = new
AtomicReference<>();
+ private final AtomicReference<Map<String, String>>
transactionalProducerIdEosV1 = new AtomicReference<>();
+ private final AtomicReference<String> transactionalProducerIdEosV2 = new
AtomicReference<>();
private class TestClientSupplier extends DefaultKafkaClientSupplier {
@Override
public Producer<byte[], byte[]> getProducer(final Map<String, Object>
config) {
- transactionalProducerId.compareAndSet(null, (String)
config.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG));
+ final String transactionalId = (String)
config.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
+
+ if (transactionalId.endsWith("-0_0") ||
transactionalId.endsWith("-0_1")) {
+ Map<String, String> transactionalIds =
transactionalProducerIdEosV1.get();
+ if (transactionalIds == null) {
+ transactionalIds = new HashMap<>();
+ transactionalProducerIdEosV1.set(transactionalIds);
+ }
+
+
transactionalIds.put(transactionalId.substring(transactionalId.length() - 3),
transactionalId);
+ } else {
+ transactionalProducerIdEosV2.compareAndSet(null,
transactionalId);
+ }
return new KafkaProducer<>(config, new ByteArraySerializer(), new
ByteArraySerializer());
}
@@ -1015,8 +1027,12 @@ public class EosIntegrationTest {
static final AtomicReference<TaskId> TASK_WITH_DATA = new
AtomicReference<>();
static final AtomicBoolean DID_REVOKE_IDLE_TASK = new AtomicBoolean(false);
- @Test
- public void
shouldNotCommitActiveTasksWithPendingInputIfRevokedTaskDidNotMakeProgress()
throws Exception {
+ @ParameterizedTest
+ @ValueSource(strings = {StreamsConfig.EXACTLY_ONCE,
StreamsConfig.EXACTLY_ONCE_V2})
+ public void
shouldNotCommitActiveTasksWithPendingInputIfRevokedTaskDidNotMakeProgress(final
String eosConfig) throws Exception {
+ TASK_WITH_DATA.set(null);
+ DID_REVOKE_IDLE_TASK.set(false);
+
final AtomicBoolean requestCommit = new AtomicBoolean(false);
final StreamsBuilder builder = new StreamsBuilder();
@@ -1041,7 +1057,7 @@ public class EosIntegrationTest {
.to(SINGLE_PARTITION_OUTPUT_TOPIC);
final Properties properties = new Properties();
- properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
+ properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
Integer.MAX_VALUE);
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
1);
@@ -1104,7 +1120,11 @@ public class EosIntegrationTest {
final List<KeyValue<Long, Long>> inputDataTask0Fencing =
Collections.singletonList(KeyValue.pair(4L, -3L));
final Properties producerConfigs = new Properties();
-
producerConfigs.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
transactionalProducerId.get());
+ if (eosConfig.equals(StreamsConfig.EXACTLY_ONCE)) {
+
producerConfigs.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
transactionalProducerIdEosV1.get().get(TASK_WITH_DATA.get().toString()));
+ } else {
+
producerConfigs.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
transactionalProducerIdEosV2.get());
+ }
IntegrationTestUtils.produceKeyValuesSynchronously(
MULTI_PARTITION_INPUT_TOPIC,
inputDataTask0Fencing,