This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ee550c4b770 KAFKA-16973; Fix caught-up condition (#16367)
ee550c4b770 is described below

commit ee550c4b770402a35f2972bbbe6b75d58af8c938
Author: David Jacot <dja...@confluent.io>
AuthorDate: Thu Jun 20 09:53:48 2024 +0200

    KAFKA-16973; Fix caught-up condition (#16367)
    
    When a write operation does not have any records, the coordinator runtime 
checked whether the state machine is caught-up to decide whether the operation 
should wait until the state machine is committed up to the operation point or 
the operation should be completed. The current implementation assumes that 
there will always be a pending write operation waiting in the deferred queue 
when the state machine is not fully caught-up yet. This is true except when the 
state machine is just load [...]
    
    This patch fixes the issue by always comparing the last written offset and 
the last committed offset.
    
    Reviewers: Chia-Ping Tsai <chia7...@gmail.com>
---
 .../group/runtime/CoordinatorRuntime.java          |  6 +-
 .../group/runtime/CoordinatorRuntimeTest.java      | 83 ++++++++++++++++++++++
 2 files changed, 85 insertions(+), 4 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
index ac594ebea05..21c5d624a6b 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
@@ -60,7 +60,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalInt;
-import java.util.OptionalLong;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.RejectedExecutionException;
@@ -900,9 +899,8 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
                 if (currentBatch != null) {
                     currentBatch.deferredEvents.add(event);
                 } else {
-                    OptionalLong pendingOffset = 
deferredEventQueue.highestPendingOffset();
-                    if (pendingOffset.isPresent()) {
-                        deferredEventQueue.add(pendingOffset.getAsLong(), 
event);
+                    if (coordinator.lastCommittedOffset() < 
coordinator.lastWrittenOffset()) {
+                        
deferredEventQueue.add(coordinator.lastWrittenOffset(), event);
                     } else {
                         event.complete(null);
                     }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
index 616a4f646ad..7a2adb3e8b0 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
@@ -3756,6 +3756,89 @@ public class CoordinatorRuntimeTest {
         assertNotEquals(coordinator, ctx.coordinator);
     }
 
+    @Test
+    public void 
testWriteOpIsNotReleasedWhenStateMachineIsNotCaughtUpAfterLoad() throws 
ExecutionException, InterruptedException, TimeoutException {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = new MockPartitionWriter();
+        CoordinatorLoader<String> loader = new CoordinatorLoader<String>() {
+            @Override
+            public CompletableFuture<LoadSummary> load(
+                TopicPartition tp,
+                CoordinatorPlayback<String> coordinator
+            ) {
+                coordinator.replay(
+                    0,
+                    RecordBatch.NO_PRODUCER_ID,
+                    RecordBatch.NO_PRODUCER_EPOCH,
+                    "record#0"
+                );
+
+                coordinator.replay(
+                    0,
+                    RecordBatch.NO_PRODUCER_ID,
+                    RecordBatch.NO_PRODUCER_EPOCH,
+                    "record#1"
+                );
+
+                coordinator.updateLastWrittenOffset(2L);
+                coordinator.updateLastCommittedOffset(1L);
+
+                return CompletableFuture.completedFuture(new LoadSummary(
+                    0L,
+                    0L,
+                    0L,
+                    2,
+                    1
+                ));
+            }
+
+            @Override
+            public void close() {}
+        };
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(loader)
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withAppendLingerMs(10)
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(2L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(1L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(2L), 
ctx.coordinator.snapshotRegistry().epochsList());
+
+        // Schedule a write operation that does not generate any records.
+        CompletableFuture<String> write = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(Collections.emptyList(), 
"response1"));
+
+        // The write operation should not be done.
+        assertFalse(write.isDone());
+
+        // Advance the last committed offset.
+        ctx.highWatermarklistener.onHighWatermarkUpdated(TP, 2L);
+
+        // Verify the state.
+        assertEquals(2L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(2L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(2L), 
ctx.coordinator.snapshotRegistry().epochsList());
+
+        // The write operation should be completed.
+        assertEquals("response1", write.get(5, TimeUnit.SECONDS));
+    }
+
     private static <S extends CoordinatorShard<U>, U> 
ArgumentMatcher<CoordinatorPlayback<U>> coordinatorMatcher(
         CoordinatorRuntime<S, U> runtime,
         TopicPartition tp

Reply via email to