This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 67fa3656cc4 MINOR: Fix Streams Position thread-safety (#19480)
67fa3656cc4 is described below
commit 67fa3656cc4b8329c914ec3f255abae37adeca3c
Author: Chris Flood <[email protected]>
AuthorDate: Wed Apr 16 15:45:38 2025 -0400
MINOR: Fix Streams Position thread-safety (#19480)
* Fixes a thread-safety bug in the Kafka Streams Position class
* Adds a multithreaded test to validate the fix and prevent regressions
Reviewers: John Roesler <[email protected]>
---
.../org/apache/kafka/streams/query/Position.java | 8 +-
.../apache/kafka/streams/query/PositionTest.java | 127 +++++++++++++++++++++
2 files changed, 129 insertions(+), 6 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/Position.java
b/streams/src/main/java/org/apache/kafka/streams/query/Position.java
index 089bca12cdc..94acd9d8adf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/query/Position.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/Position.java
@@ -104,15 +104,11 @@ public class Position {
} else {
for (final Entry<String, ConcurrentHashMap<Integer, Long>> entry :
other.position.entrySet()) {
final String topic = entry.getKey();
- final Map<Integer, Long> partitionMap =
- position.computeIfAbsent(topic, k -> new
ConcurrentHashMap<>());
+
for (final Entry<Integer, Long> partitionOffset :
entry.getValue().entrySet()) {
final Integer partition = partitionOffset.getKey();
final Long offset = partitionOffset.getValue();
- if (!partitionMap.containsKey(partition)
- || partitionMap.get(partition) < offset) {
- partitionMap.put(partition, offset);
- }
+ withComponent(topic, partition, offset);
}
}
return this;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/query/PositionTest.java
b/streams/src/test/java/org/apache/kafka/streams/query/PositionTest.java
index c994c691d2f..a65c83684ea 100644
--- a/streams/src/test/java/org/apache/kafka/streams/query/PositionTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/query/PositionTest.java
@@ -19,11 +19,21 @@ package org.apache.kafka.streams.query;
import org.junit.jupiter.api.Test;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
@@ -32,9 +42,12 @@ import static org.hamcrest.Matchers.equalTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class PositionTest {
+ private static final Random RANDOM = new Random();
+
@Test
public void shouldCreateFromMap() {
final Map<String, Map<Integer, Long>> map = mkMap(
@@ -221,4 +234,118 @@ public class PositionTest {
final HashMap<Position, Integer> map = new HashMap<>();
assertThrows(UnsupportedOperationException.class, () ->
map.put(position, 5));
}
+
+ @Test
+ public void shouldMonotonicallyIncreasePartitionPosition() throws
InterruptedException, ExecutionException, TimeoutException {
+ final int threadCount = 10;
+ final int maxTopics = 50;
+ final int maxPartitions = 50;
+ final int maxOffset = 1000;
+ final CountDownLatch startLatch = new CountDownLatch(threadCount);
+ final Position mergePosition = Position.emptyPosition();
+ final Position withComponentPosition = Position.emptyPosition();
+ final List<Future<?>> futures = new ArrayList<>();
+ ExecutorService executorService = null;
+
+ try {
+ executorService = Executors.newFixedThreadPool(threadCount);
+
+ for (int i = 0; i < threadCount; i++) {
+ futures.add(executorService.submit(() -> {
+ final Position threadPosition = Position.emptyPosition();
+ final int topicCount = RANDOM.nextInt(maxTopics) + 1;
+
+ // build the thread's position
+ for (int topicNum = 0; topicNum < topicCount; topicNum++) {
+ final String topic = "topic-" + topicNum;
+ final int partitionCount =
RANDOM.nextInt(maxPartitions) + 1;
+ for (int partitionNum = 0; partitionNum <
partitionCount; partitionNum++) {
+ final long offset = RANDOM.nextInt(maxOffset) + 1;
+ threadPosition.withComponent(topic, partitionNum,
offset);
+ }
+ }
+
+ startLatch.countDown();
+ try {
+ startLatch.await();
+ } catch (final InterruptedException e) {
+ // convert to unchecked exception so the future
completes exceptionally and fails the test
+ throw new RuntimeException(e);
+ }
+
+ // merge with the shared position
+ mergePosition.merge(threadPosition);
+ // duplicate the shared position to get a snapshot of its
state
+ final Position threadMergePositionState =
mergePosition.copy();
+
+ // update the shared position using withComponent
+ for (final String topic : threadPosition.getTopics()) {
+ for (final Map.Entry<Integer, Long> partitionOffset :
threadPosition
+ .getPartitionPositions(topic)
+ .entrySet()) {
+ withComponentPosition.withComponent(topic,
partitionOffset.getKey(), partitionOffset.getValue());
+ }
+ }
+ // duplicate the shared position to get a snapshot of its
state
+ final Position threadWithComponentPositionState =
withComponentPosition.copy();
+
+ // validate that any offsets in the merged position and
the withComponent position are >= the thread position
+ for (final String topic : threadPosition.getTopics()) {
+ final Map<Integer, Long> threadOffsets =
threadPosition.getPartitionPositions(topic);
+ final Map<Integer, Long> mergedOffsets =
threadMergePositionState.getPartitionPositions(topic);
+ final Map<Integer, Long> withComponentOffsets =
threadWithComponentPositionState.getPartitionPositions(topic);
+
+ for (final Map.Entry<Integer, Long> threadOffset :
threadOffsets.entrySet()) {
+ final int partition = threadOffset.getKey();
+ final long offsetValue = threadOffset.getValue();
+
+ // merge checks
+ assertTrue(
+ mergedOffsets.containsKey(partition),
+ "merge method failure. Missing partition "
+ partition + " for topic " + topic
+ );
+ assertTrue(
+ mergedOffsets.get(partition) >=
offsetValue,
+ "merge method failure. Offset for topic " +
+ topic +
+ " partition " +
+ partition +
+ " expected >= " +
+ offsetValue +
+ " but got " +
+ mergedOffsets.get(partition)
+ );
+
+ // withComponent checks
+ assertTrue(
+
withComponentOffsets.containsKey(partition),
+ "withComponent method failure. Missing
partition " + partition + " for topic " + topic
+ );
+ assertTrue(
+ withComponentOffsets.get(partition) >=
offsetValue,
+ "withComponent method failure. Offset for
topic " +
+ topic +
+ " partition " +
+ partition +
+ " expected >= " +
+ offsetValue +
+ " but got " +
+ withComponentOffsets.get(partition)
+ );
+ }
+ }
+ }));
+ }
+
+ for (final Future<?> future : futures) {
+ // Wait for all threads to complete
+ future.get(1, TimeUnit.SECONDS); // Check for exceptions
+ }
+ } finally {
+ if (executorService != null) {
+ executorService.shutdown();
+ assertTrue(executorService.awaitTermination(10,
TimeUnit.SECONDS));
+ }
+ }
+ }
}