jeffkbkim commented on code in PR #15534: URL: https://github.com/apache/kafka/pull/15534#discussion_r1534979427
########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ########## @@ -2591,6 +2599,74 @@ public void testPartitionLoadGeneratesSnapshotAtHighWatermarkNoRecordsLoaded() { assertTrue(ctx.coordinator.snapshotRegistry().hasSnapshot(0L)); } + @Test + public void testHighWatermarkUpdate() { + MockTimer timer = new MockTimer(); + MockPartitionWriter writer = new MockPartitionWriter(); + ManualEventProcessor processor = new ManualEventProcessor(); + + CoordinatorRuntime<MockCoordinatorShard, String> runtime = + new CoordinatorRuntime.Builder<MockCoordinatorShard, String>() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(processor) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) + .build(); + + // Loads the coordinator. Poll once to execute the load operation and once + // to complete the load. + runtime.scheduleLoadOperation(TP, 10); + processor.poll(); + processor.poll(); + + // Write #1. + CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, + state -> new CoordinatorResult<>(Collections.singletonList("record1"), "response1") + ); + processor.poll(); + + // Write #2. + CompletableFuture<String> write2 = runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT, + state -> new CoordinatorResult<>(Collections.singletonList("record2"), "response2") + ); + processor.poll(); + + // Records have been written to the log. + assertEquals(Arrays.asList( + InMemoryPartitionWriter.LogEntry.value("record1"), + InMemoryPartitionWriter.LogEntry.value("record2") + ), writer.entries(TP)); + + // There is no pending high watermark. + assertEquals(-1, runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark()); + + // Commit the first record. + writer.commit(TP, 1); + + // We should have one pending event and the pending high watermark should be set. + assertEquals(1, processor.size()); + assertEquals(1, runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark()); + + // Commit the second record. + writer.commit(TP, 2); + + // We should still have one pending event and the pending high watermark should be updated. + assertEquals(1, processor.size()); + assertEquals(2, runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark()); + + // Poll once to process the high watermark update and complete the writes. + processor.poll(); + + assertEquals(-1, runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark()); Review Comment: can we add ``` assertEquals(2, runtime.contextOrThrow(TP).coordinator.lastCommittedOffset()); ``` below to confirm the last committed offset is updated accordingly? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org