This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 67fa3656cc4 MINOR: Fix Streams Position thread-safety (#19480)
67fa3656cc4 is described below

commit 67fa3656cc4b8329c914ec3f255abae37adeca3c
Author: Chris Flood <[email protected]>
AuthorDate: Wed Apr 16 15:45:38 2025 -0400

    MINOR: Fix Streams Position thread-safety (#19480)
    
    * Fixes a thread-safety bug in the Kafka Streams Position class
    * Adds a multithreaded test to validate the fix and prevent regressions
    
    Reviewers: John Roesler <[email protected]>
---
 .../org/apache/kafka/streams/query/Position.java   |   8 +-
 .../apache/kafka/streams/query/PositionTest.java   | 127 +++++++++++++++++++++
 2 files changed, 129 insertions(+), 6 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/query/Position.java 
b/streams/src/main/java/org/apache/kafka/streams/query/Position.java
index 089bca12cdc..94acd9d8adf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/query/Position.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/Position.java
@@ -104,15 +104,11 @@ public class Position {
         } else {
             for (final Entry<String, ConcurrentHashMap<Integer, Long>> entry : 
other.position.entrySet()) {
                 final String topic = entry.getKey();
-                final Map<Integer, Long> partitionMap =
-                    position.computeIfAbsent(topic, k -> new 
ConcurrentHashMap<>());
+
                 for (final Entry<Integer, Long> partitionOffset : 
entry.getValue().entrySet()) {
                     final Integer partition = partitionOffset.getKey();
                     final Long offset = partitionOffset.getValue();
-                    if (!partitionMap.containsKey(partition)
-                        || partitionMap.get(partition) < offset) {
-                        partitionMap.put(partition, offset);
-                    }
+                    withComponent(topic, partition, offset);
                 }
             }
             return this;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/query/PositionTest.java 
b/streams/src/test/java/org/apache/kafka/streams/query/PositionTest.java
index c994c691d2f..a65c83684ea 100644
--- a/streams/src/test/java/org/apache/kafka/streams/query/PositionTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/query/PositionTest.java
@@ -19,11 +19,21 @@ package org.apache.kafka.streams.query;
 
 import org.junit.jupiter.api.Test;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
@@ -32,9 +42,12 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class PositionTest {
 
+    private static final Random RANDOM = new Random();
+
     @Test
     public void shouldCreateFromMap() {
         final Map<String, Map<Integer, Long>> map = mkMap(
@@ -221,4 +234,118 @@ public class PositionTest {
         final HashMap<Position, Integer> map = new HashMap<>();
         assertThrows(UnsupportedOperationException.class, () -> 
map.put(position, 5));
     }
+
+    @Test
+    public void shouldMonotonicallyIncreasePartitionPosition() throws 
InterruptedException, ExecutionException, TimeoutException {
+        final int threadCount = 10;
+        final int maxTopics = 50;
+        final int maxPartitions = 50;
+        final int maxOffset = 1000;
+        final CountDownLatch startLatch = new CountDownLatch(threadCount);
+        final Position mergePosition = Position.emptyPosition();
+        final Position withComponentPosition = Position.emptyPosition();
+        final List<Future<?>> futures = new ArrayList<>();
+        ExecutorService executorService = null;
+
+        try {
+            executorService = Executors.newFixedThreadPool(threadCount);
+
+            for (int i = 0; i < threadCount; i++) {
+                futures.add(executorService.submit(() -> {
+                    final Position threadPosition = Position.emptyPosition();
+                    final int topicCount = RANDOM.nextInt(maxTopics) + 1;
+
+                    // build the thread's position
+                    for (int topicNum = 0; topicNum < topicCount; topicNum++) {
+                        final String topic = "topic-" + topicNum;
+                        final int partitionCount = 
RANDOM.nextInt(maxPartitions) + 1;
+                        for (int partitionNum = 0; partitionNum < 
partitionCount; partitionNum++) {
+                            final long offset = RANDOM.nextInt(maxOffset) + 1;
+                            threadPosition.withComponent(topic, partitionNum, 
offset);
+                        }
+                    }
+
+                    startLatch.countDown();
+                    try {
+                        startLatch.await();
+                    } catch (final InterruptedException e) {
+                        // convert to unchecked exception so the future 
completes exceptionally and fails the test
+                        throw new RuntimeException(e);
+                    }
+
+                    // merge with the shared position
+                    mergePosition.merge(threadPosition);
+                    // duplicate the shared position to get a snapshot of its 
state
+                    final Position threadMergePositionState = 
mergePosition.copy();
+
+                    // update the shared position using withComponent
+                    for (final String topic : threadPosition.getTopics()) {
+                        for (final Map.Entry<Integer, Long> partitionOffset : 
threadPosition
+                                .getPartitionPositions(topic)
+                                .entrySet()) {
+                            withComponentPosition.withComponent(topic, 
partitionOffset.getKey(), partitionOffset.getValue());
+                        }
+                    }
+                    // duplicate the shared position to get a snapshot of its 
state
+                    final Position threadWithComponentPositionState = 
withComponentPosition.copy();
+
+                    // validate that any offsets in the merged position and 
the withComponent position are >= the thread position
+                    for (final String topic : threadPosition.getTopics()) {
+                        final Map<Integer, Long> threadOffsets = 
threadPosition.getPartitionPositions(topic);
+                        final Map<Integer, Long> mergedOffsets = 
threadMergePositionState.getPartitionPositions(topic);
+                        final Map<Integer, Long> withComponentOffsets = 
threadWithComponentPositionState.getPartitionPositions(topic);
+
+                        for (final Map.Entry<Integer, Long> threadOffset : 
threadOffsets.entrySet()) {
+                            final int partition = threadOffset.getKey();
+                            final long offsetValue = threadOffset.getValue();
+
+                            // merge checks
+                            assertTrue(
+                                    mergedOffsets.containsKey(partition),
+                                    "merge method failure. Missing partition " 
+ partition + " for topic " + topic
+                            );
+                            assertTrue(
+                                    mergedOffsets.get(partition) >= 
offsetValue,
+                                    "merge method failure. Offset for topic " +
+                                            topic +
+                                            " partition " +
+                                            partition +
+                                            " expected >= " +
+                                            offsetValue +
+                                            " but got " +
+                                            mergedOffsets.get(partition)
+                            );
+
+                            // withComponent checks
+                            assertTrue(
+                                    
withComponentOffsets.containsKey(partition),
+                                    "withComponent method failure. Missing 
partition " + partition + " for topic " + topic
+                            );
+                            assertTrue(
+                                    withComponentOffsets.get(partition) >= 
offsetValue,
+                                    "withComponent method failure. Offset for 
topic " +
+                                            topic +
+                                            " partition " +
+                                            partition +
+                                            " expected >= " +
+                                            offsetValue +
+                                            " but got " +
+                                            withComponentOffsets.get(partition)
+                            );
+                        }
+                    }
+                }));
+            }
+
+            for (final Future<?> future : futures) {
+                // Wait for all threads to complete
+                future.get(1, TimeUnit.SECONDS); // Check for exceptions
+            }
+        } finally {
+            if (executorService != null) {
+                executorService.shutdown();
+                assertTrue(executorService.awaitTermination(10, 
TimeUnit.SECONDS));
+            }
+        }
+    }
 }

Reply via email to