dajac commented on code in PR #16215:
URL: https://github.com/apache/kafka/pull/16215#discussion_r1633211339


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##########
@@ -3059,6 +3166,497 @@ public void testAppendRecordBatchSize() {
         assertTrue(batchSize > MIN_BUFFER_SIZE && batchSize < maxBatchSize);
     }
 
+    @Test
+    public void testScheduleWriteOperationWithBatching() throws 
ExecutionException, InterruptedException, TimeoutException {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = new MockPartitionWriter();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withAppendLingerMs(10)
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertNull(ctx.currentBatch);
+
+        // Get the max batch size.
+        int maxBatchSize = writer.config(TP).maxMessageSize();
+
+        // Create records with a quarter of the max batch size each.
+        List<String> records = Stream.of('1', '2', '3', '4').map(c -> {
+            char[] payload = new char[maxBatchSize / 4];
+            Arrays.fill(payload, c);
+            return new String(payload);
+        }).collect(Collectors.toList());
+
+        // Write #1 with two records.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(0, 2), 
"response1")
+        );
+
+        // Verify that the write is not committed yet.
+        assertFalse(write1.isDone());
+
+        // A batch has been created.
+        assertNotNull(ctx.currentBatch);
+
+        // Verify the state. Records are replayed but no batch written.
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Arrays.asList(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Collections.emptyList(), writer.entries(TP));
+
+        // Write #2 with one record.
+        CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(2, 3), 
"response2")
+        );
+
+        // Verify that the write is not committed yet.
+        assertFalse(write2.isDone());
+
+        // Verify the state. Records are replayed but no batch written.
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Arrays.asList(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+            new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Collections.emptyList(), writer.entries(TP));
+
+        // Write #3 with one record. This one cannot go into the existing batch

Review Comment:
   Correct. Each batch has a header so there is an overhead. Let me update the 
comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to