Re: [PR] KAFKA-17058; Extend CoordinatorRuntime to support non-atomic writes [kafka]

2024-07-04 Thread via GitHub


dajac merged PR #16498:
URL: https://github.com/apache/kafka/pull/16498


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17058; Extend CoordinatorRuntime to support non-atomic writes [kafka]

2024-07-03 Thread via GitHub


jeffkbkim commented on code in PR #16498:
URL: https://github.com/apache/kafka/pull/16498#discussion_r1664527321


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -3839,6 +3842,294 @@ public void close() {}
 assertEquals("response1", write.get(5, TimeUnit.SECONDS));
 }
 
+@Test
+public void testScheduleNonAtomicWriteOperation() throws 
ExecutionException, InterruptedException, TimeoutException {
+MockTimer timer = new MockTimer();
+MockPartitionWriter writer = new MockPartitionWriter();
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withTime(timer.time())
+.withTimer(timer)
+.withDefaultWriteTimeOut(Duration.ofMillis(20))
+.withLoader(new MockCoordinatorLoader())
+.withEventProcessor(new DirectEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+.withSerializer(new StringSerializer())
+.withAppendLingerMs(10)
+.build();
+
+// Schedule the loading.
+runtime.scheduleLoadOperation(TP, 10);
+
+// Verify the initial state.
+CoordinatorRuntime.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+assertNull(ctx.currentBatch);
+
+// Get the max batch size.
+int maxBatchSize = writer.config(TP).maxMessageSize();
+
+// Create records with a quarter of the max batch size each. Keep in 
mind that
+// each batch has a header so it is not possible to have those four 
records
+// in one single batch.
+List records = Stream.of('1', '2', '3', '4').map(c -> {
+char[] payload = new char[maxBatchSize / 4];
+Arrays.fill(payload, c);
+return new String(payload);
+}).collect(Collectors.toList());
+
+// Let's try to write all the records atomically (the default) to 
ensure
+// that it fails.
+CompletableFuture write1 = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+state -> new CoordinatorResult<>(records, "write#1")
+);
+
+assertFutureThrows(write1, RecordTooLargeException.class);
+
+// Let's try to write the same records non-atomically.
+CompletableFuture write2 = 
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+state -> new CoordinatorResult<>(records, "write#2", null, true, 
false)
+);
+
+// The write is pending.
+assertFalse(write2.isDone());
+
+// Verify the state.
+assertNotNull(ctx.currentBatch);
+// The last written offset is 3L because one batch was written to the 
log with
+// the first three records. The 4th one is pending.
+assertEquals(3L, ctx.coordinator.lastWrittenOffset());
+assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+assertEquals(Arrays.asList(0L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
+assertEquals(Arrays.asList(
+new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
+new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
+), ctx.coordinator.coordinator().fullRecords());
+assertEquals(Collections.singletonList(
+records(timer.time().milliseconds(), records.subList(0, 3))
+), writer.entries(TP));
+
+// Commit up to 3L.
+writer.commit(TP, 3L);
+
+// The write is still pending.
+assertFalse(write2.isDone());
+
+// Advance past the linger time to flush the pending batch.
+timer.advanceClock(11);
+
+// Verify the state.
+assertNull(ctx.currentBatch);
+assertEquals(4L, ctx.coordinator.lastWrittenOffset());
+assertEquals(3L, ctx.coordinator.lastCommittedOffset());
+assertEquals(Arrays.asList(3L, 4L), 
ctx.coordinator.snapshotRegistry().epochsList());
+assertEquals(Arrays.asList(
+new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
+new MockCoordinatorShard.R

Re: [PR] KAFKA-17058; Extend CoordinatorRuntime to support non-atomic writes [kafka]

2024-07-02 Thread via GitHub


dajac commented on code in PR #16498:
URL: https://github.com/apache/kafka/pull/16498#discussion_r1662970452


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -3839,6 +3842,209 @@ public void close() {}
 assertEquals("response1", write.get(5, TimeUnit.SECONDS));
 }
 
+@Test
+public void testScheduleNonAtomicWriteOperation() throws 
ExecutionException, InterruptedException, TimeoutException {
+MockTimer timer = new MockTimer();
+MockPartitionWriter writer = new MockPartitionWriter();
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withTime(timer.time())
+.withTimer(timer)
+.withDefaultWriteTimeOut(Duration.ofMillis(20))
+.withLoader(new MockCoordinatorLoader())
+.withEventProcessor(new DirectEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+.withSerializer(new StringSerializer())
+.withAppendLingerMs(10)
+.build();
+
+// Schedule the loading.
+runtime.scheduleLoadOperation(TP, 10);
+
+// Verify the initial state.
+CoordinatorRuntime.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+assertNull(ctx.currentBatch);
+
+// Get the max batch size.
+int maxBatchSize = writer.config(TP).maxMessageSize();
+
+// Create records with a quarter of the max batch size each. Keep in 
mind that
+// each batch has a header so it is not possible to have those four 
records
+// in one single batch.
+List records = Stream.of('1', '2', '3', '4').map(c -> {
+char[] payload = new char[maxBatchSize / 4];
+Arrays.fill(payload, c);
+return new String(payload);
+}).collect(Collectors.toList());
+
+// Let's try to write all the records atomically (the default) to 
ensure
+// that it fails.
+CompletableFuture write1 = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+state -> new CoordinatorResult<>(records, "write#1")
+);
+
+assertFutureThrows(write1, RecordTooLargeException.class);
+
+// Let's try to write the same records non-atomically.
+CompletableFuture write2 = 
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+state -> new CoordinatorResult<>(records, "write#2", null, true, 
false)
+);
+
+// The write is pending.
+assertFalse(write2.isDone());
+
+// Verify the state.
+assertNotNull(ctx.currentBatch);
+// The last written offset is 3L because one batch was written to the 
log with
+// the first three records. The 4th one is pending.
+assertEquals(3L, ctx.coordinator.lastWrittenOffset());
+assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+assertEquals(Arrays.asList(0L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
+assertEquals(Arrays.asList(
+new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
+new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
+), ctx.coordinator.coordinator().fullRecords());
+assertEquals(Collections.singletonList(
+records(timer.time().milliseconds(), records.subList(0, 3))
+), writer.entries(TP));
+
+// Commit up to 3L.
+writer.commit(TP, 3L);
+
+// The write is still pending.
+assertFalse(write2.isDone());
+
+// Advance past the linger time to flush the pending batch.
+timer.advanceClock(11);
+
+// Verify the state.
+assertNull(ctx.currentBatch);
+assertEquals(4L, ctx.coordinator.lastWrittenOffset());
+assertEquals(3L, ctx.coordinator.lastCommittedOffset());
+assertEquals(Arrays.asList(3L, 4L), 
ctx.coordinator.snapshotRegistry().epochsList());
+assertEquals(Arrays.asList(
+new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
+new MockCoordinatorShard.Recor

Re: [PR] KAFKA-17058; Extend CoordinatorRuntime to support non-atomic writes [kafka]

2024-07-02 Thread via GitHub


jolshan commented on code in PR #16498:
URL: https://github.com/apache/kafka/pull/16498#discussion_r1662961495


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -3839,6 +3842,209 @@ public void close() {}
 assertEquals("response1", write.get(5, TimeUnit.SECONDS));
 }
 
+@Test
+public void testScheduleNonAtomicWriteOperation() throws 
ExecutionException, InterruptedException, TimeoutException {
+MockTimer timer = new MockTimer();
+MockPartitionWriter writer = new MockPartitionWriter();
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withTime(timer.time())
+.withTimer(timer)
+.withDefaultWriteTimeOut(Duration.ofMillis(20))
+.withLoader(new MockCoordinatorLoader())
+.withEventProcessor(new DirectEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+.withSerializer(new StringSerializer())
+.withAppendLingerMs(10)
+.build();
+
+// Schedule the loading.
+runtime.scheduleLoadOperation(TP, 10);
+
+// Verify the initial state.
+CoordinatorRuntime.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+assertEquals(0L, ctx.coordinator.lastWrittenOffset());
+assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+assertNull(ctx.currentBatch);
+
+// Get the max batch size.
+int maxBatchSize = writer.config(TP).maxMessageSize();
+
+// Create records with a quarter of the max batch size each. Keep in 
mind that
+// each batch has a header so it is not possible to have those four 
records
+// in one single batch.
+List records = Stream.of('1', '2', '3', '4').map(c -> {
+char[] payload = new char[maxBatchSize / 4];
+Arrays.fill(payload, c);
+return new String(payload);
+}).collect(Collectors.toList());
+
+// Let's try to write all the records atomically (the default) to 
ensure
+// that it fails.
+CompletableFuture write1 = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
+state -> new CoordinatorResult<>(records, "write#1")
+);
+
+assertFutureThrows(write1, RecordTooLargeException.class);
+
+// Let's try to write the same records non-atomically.
+CompletableFuture write2 = 
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
+state -> new CoordinatorResult<>(records, "write#2", null, true, 
false)
+);
+
+// The write is pending.
+assertFalse(write2.isDone());
+
+// Verify the state.
+assertNotNull(ctx.currentBatch);
+// The last written offset is 3L because one batch was written to the 
log with
+// the first three records. The 4th one is pending.
+assertEquals(3L, ctx.coordinator.lastWrittenOffset());
+assertEquals(0L, ctx.coordinator.lastCommittedOffset());
+assertEquals(Arrays.asList(0L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
+assertEquals(Arrays.asList(
+new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
+new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
+), ctx.coordinator.coordinator().fullRecords());
+assertEquals(Collections.singletonList(
+records(timer.time().milliseconds(), records.subList(0, 3))
+), writer.entries(TP));
+
+// Commit up to 3L.
+writer.commit(TP, 3L);
+
+// The write is still pending.
+assertFalse(write2.isDone());
+
+// Advance past the linger time to flush the pending batch.
+timer.advanceClock(11);
+
+// Verify the state.
+assertNull(ctx.currentBatch);
+assertEquals(4L, ctx.coordinator.lastWrittenOffset());
+assertEquals(3L, ctx.coordinator.lastCommittedOffset());
+assertEquals(Arrays.asList(3L, 4L), 
ctx.coordinator.snapshotRegistry().epochsList());
+assertEquals(Arrays.asList(
+new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
+new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
+new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
+new MockCoordinatorShard.Rec

Re: [PR] KAFKA-17058; Extend CoordinatorRuntime to support non-atomic writes [kafka]

2024-07-02 Thread via GitHub


dajac commented on PR #16498:
URL: https://github.com/apache/kafka/pull/16498#issuecomment-2203203451

   @jeffkbkim @dongnuo123 @jolshan Thanks for your comments. I addressed them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17058; Extend CoordinatorRuntime to support non-atomic writes [kafka]

2024-07-02 Thread via GitHub


dajac commented on code in PR #16498:
URL: https://github.com/apache/kafka/pull/16498#discussion_r1662541019


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -936,62 +941,90 @@ private void append(
 ));
 }
 
-// Compute the estimated size of the records.
-int estimatedSize = AbstractRecords.estimateSizeInBytes(
-currentBatch.builder.magic(),
-compression.type(),
-recordsToAppend
-);
+if (isAtomic) {

Review Comment:
   > To confirm my understanding, we take the original path when the append 
should be atomic, which means we verify whether all records can fit in the 
current batch. If not, we allocate a new batch then append the records. If the 
append does not need to be atomic, we append individual records until we can 
fit no more, then allocate a new batch.
   > 
   > This will reduce the number of writes to the log as we will have more 
filled batches on average. In practice, only the unload partition and cleanup 
group metadata jobs will have non-atomic writes today so we should not expect 
much impact.
   > 
   > Is this correct?
   
   Yes. This is correct. However, I did not do this change to improve the how 
batches are filled but rather to ensure that we would not be stuck in the case 
where we have a large number of records that could not fit in one batch.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -936,62 +941,90 @@ private void append(
 ));
 }
 
-// Compute the estimated size of the records.
-int estimatedSize = AbstractRecords.estimateSizeInBytes(
-currentBatch.builder.magic(),
-compression.type(),
-recordsToAppend
-);
+if (isAtomic) {

Review Comment:
   > One other thing I noticed is the ordering. For the atomic case, we create 
the batch first and then replay whereas non-atomic we do the replay then add to 
batch. Not sure if it makes a big difference though since we moved where we 
enqueue the event.
   
   This is a very good point and the code was wrong. I changed it to check if 
the record can fit in the batch before replaying. I added a test for this too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17058; Extend CoordinatorRuntime to support non-atomic writes [kafka]

2024-07-02 Thread via GitHub


dajac commented on code in PR #16498:
URL: https://github.com/apache/kafka/pull/16498#discussion_r1662545695


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -936,62 +941,90 @@ private void append(
 ));
 }
 
-// Compute the estimated size of the records.
-int estimatedSize = AbstractRecords.estimateSizeInBytes(
-currentBatch.builder.magic(),
-compression.type(),
-recordsToAppend
-);
+if (isAtomic) {
+// Compute the estimated size of the records.
+int estimatedSize = AbstractRecords.estimateSizeInBytes(
+currentBatch.builder.magic(),
+compression.type(),
+recordsToAppend
+);
 
-// Check if the current batch has enough space. We check is 
before
-// replaying the records in order to avoid having to revert 
back
-// changes if the records do not fit within a batch.
-if (estimatedSize > currentBatch.builder.maxAllowedBytes()) {
-throw new RecordTooLargeException("Message batch size is " 
+ estimatedSize +
-" bytes in append to partition " + tp + " which 
exceeds the maximum " +
-"configured size of " + currentBatch.maxBatchSize + 
".");
-}
+// Check if the current batch has enough space. We check 
is before
+// replaying the records in order to avoid having to 
revert back
+// changes if the records do not fit within a batch.
+if (estimatedSize > 
currentBatch.builder.maxAllowedBytes()) {
+throw new RecordTooLargeException("Message batch size 
is " + estimatedSize +
+" bytes in append to partition " + tp + " which 
exceeds the maximum " +
+"configured size of " + currentBatch.maxBatchSize 
+ ".");
+}
 
-if (!currentBatch.builder.hasRoomFor(estimatedSize)) {
-// Otherwise, we write the current batch, allocate a new 
one and re-verify
-// whether the records fit in it.
-// If flushing fails, we don't catch the exception in 
order to let
-// the caller fail the current operation.
-flushCurrentBatch();
-maybeAllocateNewBatch(
-producerId,
-producerEpoch,
-verificationGuard,
-currentTimeMs
-);
+if (!currentBatch.builder.hasRoomFor(estimatedSize)) {
+// Otherwise, we write the current batch, allocate a 
new one and re-verify
+// whether the records fit in it.
+// If flushing fails, we don't catch the exception in 
order to let
+// the caller fail the current operation.
+flushCurrentBatch();
+maybeAllocateNewBatch(
+producerId,
+producerEpoch,
+verificationGuard,
+currentTimeMs
+);
+}
 }
 
-// Add the event to the list of pending events associated with 
the batch.
-currentBatch.deferredEvents.add(event);
-
 try {
-// Apply record to the state machine.
-if (replay) {
-for (int i = 0; i < records.size(); i++) {
-// We compute the offset of the record based on 
the last written offset. The
-// coordinator is the single writer to the 
underlying partition so we can
-// deduce it like this.
+for (int i = 0; i < records.size(); i++) {
+U recordToReplay = records.get(i);
+SimpleRecord recordToAppend = recordsToAppend.get(i);
+
+if (replay) {
 coordinator.replay(
-currentBatch.nextOffset + i,
+currentBatch.nextOffset,
 producerId,
 producerEpoch,
-records.get(i)
+recordToReplay
 );
 }
-}
 
-// Append to the batch.
-for (SimpleRecord record : recordsToApp