dajac commented on code in PR #16215: URL: https://github.com/apache/kafka/pull/16215#discussion_r1635332216
########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ########## @@ -3664,6 +3664,87 @@ public void testScheduleTransactionalWriteOperationWithBatching() throws Executi assertNull(complete1.get(5, TimeUnit.SECONDS)); } + @Test + public void testStateMachineIsReloadedWhenOutOfSync() { + MockTimer timer = new MockTimer(); + MockCoordinatorLoader loader = spy(new MockCoordinatorLoader()); + MockPartitionWriter writer = new MockPartitionWriter() { + @Override + public long append( + TopicPartition tp, + VerificationGuard verificationGuard, + MemoryRecords batch + ) { + // Add 1 to the returned offsets. + return super.append(tp, verificationGuard, batch) + 1; + } + }; + + CoordinatorRuntime<MockCoordinatorShard, String> runtime = + new CoordinatorRuntime.Builder<MockCoordinatorShard, String>() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(Duration.ofMillis(20)) + .withLoader(loader) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) + .withSerializer(new StringSerializer()) + .withAppendLingerMs(10) + .build(); + + // Schedule the loading. + runtime.scheduleLoadOperation(TP, 10); + + // Verify the initial state. + CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertEquals(0L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertNull(ctx.currentBatch); + + // Get the max batch size. + int maxBatchSize = writer.config(TP).maxMessageSize(); + + // Create records with a quarter of the max batch size each. Keep in mind that + // each batch has a header so it is not possible to have those four records + // in one single batch. + List<String> records = Stream.of('1', '2', '3', '4').map(c -> { + char[] payload = new char[maxBatchSize / 4]; + Arrays.fill(payload, c); + return new String(payload); + }).collect(Collectors.toList()); + + // Write #1. + CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(records.subList(0, 1), "response1")); + + // Write #2. + CompletableFuture<String> write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(records.subList(1, 2), "response2")); + + // Write #3. + CompletableFuture<String> write3 = runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(records.subList(2, 3), "response3")); + + // Write #4. This write cannot make it in the current batch. So the current batch + // is flushed. It will fail. So we expect all writes to fail. + CompletableFuture<String> write4 = runtime.scheduleWriteOperation("write#4", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(records.subList(3, 4), "response4")); + + // Verify the futures. + assertFutureThrows(write1, NotCoordinatorException.class); Review Comment: > the only other thing I would wonder is if there is a way to confirm the coordinator is ready for writes again. Extended the unit test to validate the state and to ensure that the state machine is actually different from the previous one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org