jeffkbkim commented on code in PR #15534:
URL: https://github.com/apache/kafka/pull/15534#discussion_r1534979427


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##########
@@ -2591,6 +2599,74 @@ public void 
testPartitionLoadGeneratesSnapshotAtHighWatermarkNoRecordsLoaded() {
         assertTrue(ctx.coordinator.snapshotRegistry().hasSnapshot(0L));
     }
 
+    @Test
+    public void testHighWatermarkUpdate() {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = new MockPartitionWriter();
+        ManualEventProcessor processor = new ManualEventProcessor();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(processor)
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+                .build();
+
+        // Loads the coordinator. Poll once to execute the load operation and 
once
+        // to complete the load.
+        runtime.scheduleLoadOperation(TP, 10);
+        processor.poll();
+        processor.poll();
+
+        // Write #1.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
+            state -> new 
CoordinatorResult<>(Collections.singletonList("record1"), "response1")
+        );
+        processor.poll();
+
+        // Write #2.
+        CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT,
+            state -> new 
CoordinatorResult<>(Collections.singletonList("record2"), "response2")
+        );
+        processor.poll();
+
+        // Records have been written to the log.
+        assertEquals(Arrays.asList(
+            InMemoryPartitionWriter.LogEntry.value("record1"),
+            InMemoryPartitionWriter.LogEntry.value("record2")
+        ), writer.entries(TP));
+
+        // There is no pending high watermark.
+        assertEquals(-1, 
runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark());
+
+        // Commit the first record.
+        writer.commit(TP, 1);
+
+        // We should have one pending event and the pending high watermark 
should be set.
+        assertEquals(1, processor.size());
+        assertEquals(1, 
runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark());
+
+        // Commit the second record.
+        writer.commit(TP, 2);
+
+        // We should still have one pending event and the pending high 
watermark should be updated.
+        assertEquals(1, processor.size());
+        assertEquals(2, 
runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark());
+
+        // Poll once to process the high watermark update and complete the 
writes.
+        processor.poll();
+
+        assertEquals(-1, 
runtime.contextOrThrow(TP).highWatermarklistener.lastHighWatermark());

Review Comment:
   can we add
   ```
   assertEquals(2, 
runtime.contextOrThrow(TP).coordinator.lastCommittedOffset());
   ```
   below to confirm the last committed offset is updated accordingly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to