This is an automated email from the ASF dual-hosted git repository. dajac pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new ee550c4b770 KAFKA-16973; Fix caught-up condition (#16367) ee550c4b770 is described below commit ee550c4b770402a35f2972bbbe6b75d58af8c938 Author: David Jacot <dja...@confluent.io> AuthorDate: Thu Jun 20 09:53:48 2024 +0200 KAFKA-16973; Fix caught-up condition (#16367) When a write operation does not have any records, the coordinator runtime checked whether the state machine is caught-up to decide whether the operation should wait until the state machine is committed up to the operation point or the operation should be completed. The current implementation assumes that there will always be a pending write operation waiting in the deferred queue when the state machine is not fully caught-up yet. This is true except when the state machine is just load [...] This patch fixes the issue by always comparing the last written offset and the last committed offset. Reviewers: Chia-Ping Tsai <chia7...@gmail.com> --- .../group/runtime/CoordinatorRuntime.java | 6 +- .../group/runtime/CoordinatorRuntimeTest.java | 83 ++++++++++++++++++++++ 2 files changed, 85 insertions(+), 4 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java index ac594ebea05..21c5d624a6b 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java @@ -60,7 +60,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.OptionalInt; -import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.RejectedExecutionException; @@ -900,9 +899,8 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut if (currentBatch != null) { currentBatch.deferredEvents.add(event); } else { - OptionalLong pendingOffset = deferredEventQueue.highestPendingOffset(); - if (pendingOffset.isPresent()) { - deferredEventQueue.add(pendingOffset.getAsLong(), event); + if (coordinator.lastCommittedOffset() < coordinator.lastWrittenOffset()) { + deferredEventQueue.add(coordinator.lastWrittenOffset(), event); } else { event.complete(null); } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java index 616a4f646ad..7a2adb3e8b0 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java @@ -3756,6 +3756,89 @@ public class CoordinatorRuntimeTest { assertNotEquals(coordinator, ctx.coordinator); } + @Test + public void testWriteOpIsNotReleasedWhenStateMachineIsNotCaughtUpAfterLoad() throws ExecutionException, InterruptedException, TimeoutException { + MockTimer timer = new MockTimer(); + MockPartitionWriter writer = new MockPartitionWriter(); + CoordinatorLoader<String> loader = new CoordinatorLoader<String>() { + @Override + public CompletableFuture<LoadSummary> load( + TopicPartition tp, + CoordinatorPlayback<String> coordinator + ) { + coordinator.replay( + 0, + RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, + "record#0" + ); + + coordinator.replay( + 0, + RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, + "record#1" + ); + + coordinator.updateLastWrittenOffset(2L); + coordinator.updateLastCommittedOffset(1L); + + return CompletableFuture.completedFuture(new LoadSummary( + 0L, + 0L, + 0L, + 2, + 1 + )); + } + + @Override + public void close() {} + }; + + CoordinatorRuntime<MockCoordinatorShard, String> runtime = + new CoordinatorRuntime.Builder<MockCoordinatorShard, String>() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(Duration.ofMillis(20)) + .withLoader(loader) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) + .withSerializer(new StringSerializer()) + .withAppendLingerMs(10) + .build(); + + // Schedule the loading. + runtime.scheduleLoadOperation(TP, 10); + + // Verify the initial state. + CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertEquals(2L, ctx.coordinator.lastWrittenOffset()); + assertEquals(1L, ctx.coordinator.lastCommittedOffset()); + assertEquals(Collections.singletonList(2L), ctx.coordinator.snapshotRegistry().epochsList()); + + // Schedule a write operation that does not generate any records. + CompletableFuture<String> write = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(Collections.emptyList(), "response1")); + + // The write operation should not be done. + assertFalse(write.isDone()); + + // Advance the last committed offset. + ctx.highWatermarklistener.onHighWatermarkUpdated(TP, 2L); + + // Verify the state. + assertEquals(2L, ctx.coordinator.lastWrittenOffset()); + assertEquals(2L, ctx.coordinator.lastCommittedOffset()); + assertEquals(Collections.singletonList(2L), ctx.coordinator.snapshotRegistry().epochsList()); + + // The write operation should be completed. + assertEquals("response1", write.get(5, TimeUnit.SECONDS)); + } + private static <S extends CoordinatorShard<U>, U> ArgumentMatcher<CoordinatorPlayback<U>> coordinatorMatcher( CoordinatorRuntime<S, U> runtime, TopicPartition tp