dajac commented on code in PR #16215:
URL: https://github.com/apache/kafka/pull/16215#discussion_r1633229058


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##########
@@ -3059,6 +3166,497 @@ public void testAppendRecordBatchSize() {
         assertTrue(batchSize > MIN_BUFFER_SIZE && batchSize < maxBatchSize);
     }
 
+    @Test
+    public void testScheduleWriteOperationWithBatching() throws 
ExecutionException, InterruptedException, TimeoutException {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = new MockPartitionWriter();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withAppendLingerMs(10)
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertNull(ctx.currentBatch);
+
+        // Get the max batch size.
+        int maxBatchSize = writer.config(TP).maxMessageSize();
+
+        // Create records with a quarter of the max batch size each.
+        List<String> records = Stream.of('1', '2', '3', '4').map(c -> {
+            char[] payload = new char[maxBatchSize / 4];
+            Arrays.fill(payload, c);
+            return new String(payload);
+        }).collect(Collectors.toList());
+
+        // Write #1 with two records.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(0, 2), 
"response1")
+        );
+
+        // Verify that the write is not committed yet.
+        assertFalse(write1.isDone());
+
+        // A batch has been created.
+        assertNotNull(ctx.currentBatch);
+
+        // Verify the state. Records are replayed but no batch written.
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Arrays.asList(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Collections.emptyList(), writer.entries(TP));
+
+        // Write #2 with one record.
+        CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(2, 3), 
"response2")
+        );
+
+        // Verify that the write is not committed yet.
+        assertFalse(write2.isDone());
+
+        // Verify the state. Records are replayed but no batch written.
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Arrays.asList(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+            new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Collections.emptyList(), writer.entries(TP));
+
+        // Write #3 with one record. This one cannot go into the existing batch
+        // so the existing batch should be flushed and a new one should be 
created.
+        CompletableFuture<String> write3 = 
runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(3, 4), 
"response3")
+        );
+
+        // Verify that the write is not committed yet.
+        assertFalse(write3.isDone());
+
+        // Verify the state. Records are replayed. The previous batch
+        // got flushed with all the records but the new one from #3.
+        assertEquals(3L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Arrays.asList(0L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Arrays.asList(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+            new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
+            new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Arrays.asList(
+            records(timer.time().milliseconds(), records.subList(0, 3))
+        ), writer.entries(TP));
+
+        // Advance past the linger time.
+        timer.advanceClock(11);
+
+        // Verify the state. The pending batch is flushed.
+        assertEquals(4L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Arrays.asList(0L, 3L, 4L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Arrays.asList(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+            new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
+            new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Arrays.asList(
+            records(timer.time().milliseconds() - 11, records.subList(0, 3)),
+            records(timer.time().milliseconds() - 11, records.subList(3, 4))
+        ), writer.entries(TP));
+
+        // Commit and verify that writes are completed.
+        writer.commit(TP);
+        assertTrue(write1.isDone());
+        assertTrue(write2.isDone());
+        assertTrue(write3.isDone());
+        assertEquals("response1", write1.get(5, TimeUnit.SECONDS));
+        assertEquals("response2", write2.get(5, TimeUnit.SECONDS));
+        assertEquals("response3", write3.get(5, TimeUnit.SECONDS));
+    }
+
+    @Test
+    public void testScheduleWriteOperationWithBatchingWhenRecordsTooLarge() {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = new MockPartitionWriter();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withAppendLingerMs(10)
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertNull(ctx.currentBatch);
+
+        // Get the max batch size.
+        int maxBatchSize = writer.config(TP).maxMessageSize();
+
+        // Create records with a quarter of the max batch size each.
+        List<String> records = Stream.of('1', '2', '3', '4').map(c -> {
+            char[] payload = new char[maxBatchSize / 4];
+            Arrays.fill(payload, c);
+            return new String(payload);
+        }).collect(Collectors.toList());
+
+        // Write all the records.
+        CompletableFuture<String> write = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records, "response1")
+        );
+
+        assertFutureThrows(write, RecordTooLargeException.class);
+    }
+
+    @Test
+    public void testScheduleWriteOperationWithBatchingWhenWriteFails() {
+        MockTimer timer = new MockTimer();
+        // The partition writer only accept no writes.
+        MockPartitionWriter writer = new MockPartitionWriter(0);
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withAppendLingerMs(10)
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertNull(ctx.currentBatch);
+
+        // Get the max batch size.
+        int maxBatchSize = writer.config(TP).maxMessageSize();
+
+        // Create records with a quarter of the max batch size each.
+        List<String> records = Stream.of('1', '2', '3', '4').map(c -> {
+            char[] payload = new char[maxBatchSize / 4];
+            Arrays.fill(payload, c);
+            return new String(payload);
+        }).collect(Collectors.toList());
+
+        // Write #1.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(0, 1), 
"response1"));
+
+        // Write #2.
+        CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(1, 2), 
"response2"));
+
+        // Write #3.
+        CompletableFuture<String> write3 = 
runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(2, 3), 
"response3"));
+
+        // Verify the state.
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Arrays.asList(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+            new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+            new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Collections.emptyList(), writer.entries(TP));
+
+        // Write #4. This write cannot make it in the current batch. So the 
current batch
+        // is flushed. It will fail. So we expect all writes to fail.
+        CompletableFuture<String> write4 = 
runtime.scheduleWriteOperation("write#4", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(3, 4), 
"response4"));
+
+        // Verify the futures.
+        assertFutureThrows(write1, KafkaException.class);
+        assertFutureThrows(write2, KafkaException.class);
+        assertFutureThrows(write3, KafkaException.class);
+        // Write #4 is also expected to fail.
+        assertFutureThrows(write4, KafkaException.class);
+
+        // Verify the state. The record is replayed but not written.
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Collections.emptyList(), 
ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Collections.emptyList(), writer.entries(TP));
+    }
+
+    @Test
+    public void testScheduleWriteOperationWithBatchingWhenReplayFails() {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = new MockPartitionWriter();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withAppendLingerMs(10)
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertNull(ctx.currentBatch);
+
+        // Override the coordinator with a coordinator that throws
+        // an exception when replay is called.
+        SnapshotRegistry snapshotRegistry = ctx.coordinator.snapshotRegistry();
+        ctx.coordinator = new SnapshottableCoordinator<>(
+            new LogContext(),
+            snapshotRegistry,
+            new MockCoordinatorShard(snapshotRegistry, ctx.timer) {
+                @Override
+                public void replay(
+                    long offset,
+                    long producerId,
+                    short producerEpoch,
+                    String record
+                ) throws RuntimeException {
+                    if (offset >= 1) {
+                        throw new IllegalArgumentException("error");
+                    }
+                    super.replay(
+                        offset,
+                        producerId,
+                        producerEpoch,
+                        record
+                    );
+                }
+            },
+            TP
+        );
+
+        // Get the max batch size.
+        int maxBatchSize = writer.config(TP).maxMessageSize();
+
+        // Create records with a quarter of the max batch size each.
+        List<String> records = Stream.of('1', '2').map(c -> {
+            char[] payload = new char[maxBatchSize / 4];
+            Arrays.fill(payload, c);
+            return new String(payload);
+        }).collect(Collectors.toList());
+
+        // Write #1.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(0, 1), 
"response1"));
+
+        // Verify the state.
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Arrays.asList(
+            new MockCoordinatorShard.RecordAndMetadata(0, records.get(0))
+        ), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Collections.emptyList(), writer.entries(TP));
+
+        // Write #2. It should fail.
+        CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+            state -> new CoordinatorResult<>(records.subList(1, 2), 
"response2"));
+
+        // Verify the futures.
+        assertFutureThrows(write1, IllegalArgumentException.class);
+        assertFutureThrows(write2, IllegalArgumentException.class);
+
+        // Verify the state.
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Collections.emptyList(), 
ctx.coordinator.coordinator().fullRecords());
+        assertEquals(Collections.emptyList(), writer.entries(TP));
+    }
+
+    @Test
+    public void testScheduleTransactionalWriteOperationWithBatching() throws 
ExecutionException, InterruptedException, TimeoutException {
+        MockTimer timer = new MockTimer();
+        MockPartitionWriter writer = new MockPartitionWriter();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(Duration.ofMillis(20))
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+                .withSerializer(new StringSerializer())
+                .withAppendLingerMs(10)
+                .build();
+
+        // Schedule the loading.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertNull(ctx.currentBatch);
+
+        // Write #1 with two records.
+        CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+            state -> new 
CoordinatorResult<>(Collections.singletonList("record#1"), "response1")
+        );
+
+        // Verify that the write is not committed yet.
+        assertFalse(write1.isDone());
+
+        // Verify the state. Records are replayed but no batch written.
+        assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(Collections.emptySet(), 
ctx.coordinator.coordinator().pendingRecords(100L));
+        assertEquals(mkSet("record#1"), 
ctx.coordinator.coordinator().records());
+        assertEquals(Collections.emptyList(), writer.entries(TP));
+
+        // Transactional write #2 with one record. This will flush the current 
batch.
+        CompletableFuture<String> write2 = 
runtime.scheduleTransactionalWriteOperation(
+            "txn-write#1",
+            TP,
+            "transactional-id",
+            100L,
+            (short) 50,
+            Duration.ofMillis(20),
+            state -> new 
CoordinatorResult<>(Collections.singletonList("record#2"), "response2"),
+            TXN_OFFSET_COMMIT_LATEST_VERSION
+        );
+
+        // Verify that the write is not committed yet.
+        assertFalse(write2.isDone());
+
+        // Verify the state. Records are replayed but no batch written.

Review Comment:
   Correct. Let me fix the comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to