Re: [PR] KAFKA-17058; Extend CoordinatorRuntime to support non-atomic writes [kafka]
dajac merged PR #16498: URL: https://github.com/apache/kafka/pull/16498 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17058; Extend CoordinatorRuntime to support non-atomic writes [kafka]
jeffkbkim commented on code in PR #16498: URL: https://github.com/apache/kafka/pull/16498#discussion_r1664527321 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -3839,6 +3842,294 @@ public void close() {} assertEquals("response1", write.get(5, TimeUnit.SECONDS)); } +@Test +public void testScheduleNonAtomicWriteOperation() throws ExecutionException, InterruptedException, TimeoutException { +MockTimer timer = new MockTimer(); +MockPartitionWriter writer = new MockPartitionWriter(); + +CoordinatorRuntime runtime = +new CoordinatorRuntime.Builder() +.withTime(timer.time()) +.withTimer(timer) +.withDefaultWriteTimeOut(Duration.ofMillis(20)) +.withLoader(new MockCoordinatorLoader()) +.withEventProcessor(new DirectEventProcessor()) +.withPartitionWriter(writer) +.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) +.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) +.withSerializer(new StringSerializer()) +.withAppendLingerMs(10) +.build(); + +// Schedule the loading. +runtime.scheduleLoadOperation(TP, 10); + +// Verify the initial state. +CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); +assertEquals(0L, ctx.coordinator.lastWrittenOffset()); +assertEquals(0L, ctx.coordinator.lastCommittedOffset()); +assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); +assertNull(ctx.currentBatch); + +// Get the max batch size. +int maxBatchSize = writer.config(TP).maxMessageSize(); + +// Create records with a quarter of the max batch size each. Keep in mind that +// each batch has a header so it is not possible to have those four records +// in one single batch. +List records = Stream.of('1', '2', '3', '4').map(c -> { +char[] payload = new char[maxBatchSize / 4]; +Arrays.fill(payload, c); +return new String(payload); +}).collect(Collectors.toList()); + +// Let's try to write all the records atomically (the default) to ensure +// that it fails. +CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20), +state -> new CoordinatorResult<>(records, "write#1") +); + +assertFutureThrows(write1, RecordTooLargeException.class); + +// Let's try to write the same records non-atomically. +CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20), +state -> new CoordinatorResult<>(records, "write#2", null, true, false) +); + +// The write is pending. +assertFalse(write2.isDone()); + +// Verify the state. +assertNotNull(ctx.currentBatch); +// The last written offset is 3L because one batch was written to the log with +// the first three records. The 4th one is pending. +assertEquals(3L, ctx.coordinator.lastWrittenOffset()); +assertEquals(0L, ctx.coordinator.lastCommittedOffset()); +assertEquals(Arrays.asList(0L, 3L), ctx.coordinator.snapshotRegistry().epochsList()); +assertEquals(Arrays.asList( +new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)), +new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)), +new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)), +new MockCoordinatorShard.RecordAndMetadata(3, records.get(3)) +), ctx.coordinator.coordinator().fullRecords()); +assertEquals(Collections.singletonList( +records(timer.time().milliseconds(), records.subList(0, 3)) +), writer.entries(TP)); + +// Commit up to 3L. +writer.commit(TP, 3L); + +// The write is still pending. +assertFalse(write2.isDone()); + +// Advance past the linger time to flush the pending batch. +timer.advanceClock(11); + +// Verify the state. +assertNull(ctx.currentBatch); +assertEquals(4L, ctx.coordinator.lastWrittenOffset()); +assertEquals(3L, ctx.coordinator.lastCommittedOffset()); +assertEquals(Arrays.asList(3L, 4L), ctx.coordinator.snapshotRegistry().epochsList()); +assertEquals(Arrays.asList( +new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)), +new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)), +new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)), +new MockCoordinatorShard.R
Re: [PR] KAFKA-17058; Extend CoordinatorRuntime to support non-atomic writes [kafka]
dajac commented on code in PR #16498: URL: https://github.com/apache/kafka/pull/16498#discussion_r1662970452 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -3839,6 +3842,209 @@ public void close() {} assertEquals("response1", write.get(5, TimeUnit.SECONDS)); } +@Test +public void testScheduleNonAtomicWriteOperation() throws ExecutionException, InterruptedException, TimeoutException { +MockTimer timer = new MockTimer(); +MockPartitionWriter writer = new MockPartitionWriter(); + +CoordinatorRuntime runtime = +new CoordinatorRuntime.Builder() +.withTime(timer.time()) +.withTimer(timer) +.withDefaultWriteTimeOut(Duration.ofMillis(20)) +.withLoader(new MockCoordinatorLoader()) +.withEventProcessor(new DirectEventProcessor()) +.withPartitionWriter(writer) +.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) +.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) +.withSerializer(new StringSerializer()) +.withAppendLingerMs(10) +.build(); + +// Schedule the loading. +runtime.scheduleLoadOperation(TP, 10); + +// Verify the initial state. +CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); +assertEquals(0L, ctx.coordinator.lastWrittenOffset()); +assertEquals(0L, ctx.coordinator.lastCommittedOffset()); +assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); +assertNull(ctx.currentBatch); + +// Get the max batch size. +int maxBatchSize = writer.config(TP).maxMessageSize(); + +// Create records with a quarter of the max batch size each. Keep in mind that +// each batch has a header so it is not possible to have those four records +// in one single batch. +List records = Stream.of('1', '2', '3', '4').map(c -> { +char[] payload = new char[maxBatchSize / 4]; +Arrays.fill(payload, c); +return new String(payload); +}).collect(Collectors.toList()); + +// Let's try to write all the records atomically (the default) to ensure +// that it fails. +CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20), +state -> new CoordinatorResult<>(records, "write#1") +); + +assertFutureThrows(write1, RecordTooLargeException.class); + +// Let's try to write the same records non-atomically. +CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20), +state -> new CoordinatorResult<>(records, "write#2", null, true, false) +); + +// The write is pending. +assertFalse(write2.isDone()); + +// Verify the state. +assertNotNull(ctx.currentBatch); +// The last written offset is 3L because one batch was written to the log with +// the first three records. The 4th one is pending. +assertEquals(3L, ctx.coordinator.lastWrittenOffset()); +assertEquals(0L, ctx.coordinator.lastCommittedOffset()); +assertEquals(Arrays.asList(0L, 3L), ctx.coordinator.snapshotRegistry().epochsList()); +assertEquals(Arrays.asList( +new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)), +new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)), +new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)), +new MockCoordinatorShard.RecordAndMetadata(3, records.get(3)) +), ctx.coordinator.coordinator().fullRecords()); +assertEquals(Collections.singletonList( +records(timer.time().milliseconds(), records.subList(0, 3)) +), writer.entries(TP)); + +// Commit up to 3L. +writer.commit(TP, 3L); + +// The write is still pending. +assertFalse(write2.isDone()); + +// Advance past the linger time to flush the pending batch. +timer.advanceClock(11); + +// Verify the state. +assertNull(ctx.currentBatch); +assertEquals(4L, ctx.coordinator.lastWrittenOffset()); +assertEquals(3L, ctx.coordinator.lastCommittedOffset()); +assertEquals(Arrays.asList(3L, 4L), ctx.coordinator.snapshotRegistry().epochsList()); +assertEquals(Arrays.asList( +new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)), +new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)), +new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)), +new MockCoordinatorShard.Recor
Re: [PR] KAFKA-17058; Extend CoordinatorRuntime to support non-atomic writes [kafka]
jolshan commented on code in PR #16498: URL: https://github.com/apache/kafka/pull/16498#discussion_r1662961495 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -3839,6 +3842,209 @@ public void close() {} assertEquals("response1", write.get(5, TimeUnit.SECONDS)); } +@Test +public void testScheduleNonAtomicWriteOperation() throws ExecutionException, InterruptedException, TimeoutException { +MockTimer timer = new MockTimer(); +MockPartitionWriter writer = new MockPartitionWriter(); + +CoordinatorRuntime runtime = +new CoordinatorRuntime.Builder() +.withTime(timer.time()) +.withTimer(timer) +.withDefaultWriteTimeOut(Duration.ofMillis(20)) +.withLoader(new MockCoordinatorLoader()) +.withEventProcessor(new DirectEventProcessor()) +.withPartitionWriter(writer) +.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) +.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) +.withSerializer(new StringSerializer()) +.withAppendLingerMs(10) +.build(); + +// Schedule the loading. +runtime.scheduleLoadOperation(TP, 10); + +// Verify the initial state. +CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); +assertEquals(0L, ctx.coordinator.lastWrittenOffset()); +assertEquals(0L, ctx.coordinator.lastCommittedOffset()); +assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); +assertNull(ctx.currentBatch); + +// Get the max batch size. +int maxBatchSize = writer.config(TP).maxMessageSize(); + +// Create records with a quarter of the max batch size each. Keep in mind that +// each batch has a header so it is not possible to have those four records +// in one single batch. +List records = Stream.of('1', '2', '3', '4').map(c -> { +char[] payload = new char[maxBatchSize / 4]; +Arrays.fill(payload, c); +return new String(payload); +}).collect(Collectors.toList()); + +// Let's try to write all the records atomically (the default) to ensure +// that it fails. +CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20), +state -> new CoordinatorResult<>(records, "write#1") +); + +assertFutureThrows(write1, RecordTooLargeException.class); + +// Let's try to write the same records non-atomically. +CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20), +state -> new CoordinatorResult<>(records, "write#2", null, true, false) +); + +// The write is pending. +assertFalse(write2.isDone()); + +// Verify the state. +assertNotNull(ctx.currentBatch); +// The last written offset is 3L because one batch was written to the log with +// the first three records. The 4th one is pending. +assertEquals(3L, ctx.coordinator.lastWrittenOffset()); +assertEquals(0L, ctx.coordinator.lastCommittedOffset()); +assertEquals(Arrays.asList(0L, 3L), ctx.coordinator.snapshotRegistry().epochsList()); +assertEquals(Arrays.asList( +new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)), +new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)), +new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)), +new MockCoordinatorShard.RecordAndMetadata(3, records.get(3)) +), ctx.coordinator.coordinator().fullRecords()); +assertEquals(Collections.singletonList( +records(timer.time().milliseconds(), records.subList(0, 3)) +), writer.entries(TP)); + +// Commit up to 3L. +writer.commit(TP, 3L); + +// The write is still pending. +assertFalse(write2.isDone()); + +// Advance past the linger time to flush the pending batch. +timer.advanceClock(11); + +// Verify the state. +assertNull(ctx.currentBatch); +assertEquals(4L, ctx.coordinator.lastWrittenOffset()); +assertEquals(3L, ctx.coordinator.lastCommittedOffset()); +assertEquals(Arrays.asList(3L, 4L), ctx.coordinator.snapshotRegistry().epochsList()); +assertEquals(Arrays.asList( +new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)), +new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)), +new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)), +new MockCoordinatorShard.Rec
Re: [PR] KAFKA-17058; Extend CoordinatorRuntime to support non-atomic writes [kafka]
dajac commented on PR #16498: URL: https://github.com/apache/kafka/pull/16498#issuecomment-2203203451 @jeffkbkim @dongnuo123 @jolshan Thanks for your comments. I addressed them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17058; Extend CoordinatorRuntime to support non-atomic writes [kafka]
dajac commented on code in PR #16498: URL: https://github.com/apache/kafka/pull/16498#discussion_r1662541019 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -936,62 +941,90 @@ private void append( )); } -// Compute the estimated size of the records. -int estimatedSize = AbstractRecords.estimateSizeInBytes( -currentBatch.builder.magic(), -compression.type(), -recordsToAppend -); +if (isAtomic) { Review Comment: > To confirm my understanding, we take the original path when the append should be atomic, which means we verify whether all records can fit in the current batch. If not, we allocate a new batch then append the records. If the append does not need to be atomic, we append individual records until we can fit no more, then allocate a new batch. > > This will reduce the number of writes to the log as we will have more filled batches on average. In practice, only the unload partition and cleanup group metadata jobs will have non-atomic writes today so we should not expect much impact. > > Is this correct? Yes. This is correct. However, I did not do this change to improve the how batches are filled but rather to ensure that we would not be stuck in the case where we have a large number of records that could not fit in one batch. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -936,62 +941,90 @@ private void append( )); } -// Compute the estimated size of the records. -int estimatedSize = AbstractRecords.estimateSizeInBytes( -currentBatch.builder.magic(), -compression.type(), -recordsToAppend -); +if (isAtomic) { Review Comment: > One other thing I noticed is the ordering. For the atomic case, we create the batch first and then replay whereas non-atomic we do the replay then add to batch. Not sure if it makes a big difference though since we moved where we enqueue the event. This is a very good point and the code was wrong. I changed it to check if the record can fit in the batch before replaying. I added a test for this too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17058; Extend CoordinatorRuntime to support non-atomic writes [kafka]
dajac commented on code in PR #16498: URL: https://github.com/apache/kafka/pull/16498#discussion_r1662545695 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -936,62 +941,90 @@ private void append( )); } -// Compute the estimated size of the records. -int estimatedSize = AbstractRecords.estimateSizeInBytes( -currentBatch.builder.magic(), -compression.type(), -recordsToAppend -); +if (isAtomic) { +// Compute the estimated size of the records. +int estimatedSize = AbstractRecords.estimateSizeInBytes( +currentBatch.builder.magic(), +compression.type(), +recordsToAppend +); -// Check if the current batch has enough space. We check is before -// replaying the records in order to avoid having to revert back -// changes if the records do not fit within a batch. -if (estimatedSize > currentBatch.builder.maxAllowedBytes()) { -throw new RecordTooLargeException("Message batch size is " + estimatedSize + -" bytes in append to partition " + tp + " which exceeds the maximum " + -"configured size of " + currentBatch.maxBatchSize + "."); -} +// Check if the current batch has enough space. We check is before +// replaying the records in order to avoid having to revert back +// changes if the records do not fit within a batch. +if (estimatedSize > currentBatch.builder.maxAllowedBytes()) { +throw new RecordTooLargeException("Message batch size is " + estimatedSize + +" bytes in append to partition " + tp + " which exceeds the maximum " + +"configured size of " + currentBatch.maxBatchSize + "."); +} -if (!currentBatch.builder.hasRoomFor(estimatedSize)) { -// Otherwise, we write the current batch, allocate a new one and re-verify -// whether the records fit in it. -// If flushing fails, we don't catch the exception in order to let -// the caller fail the current operation. -flushCurrentBatch(); -maybeAllocateNewBatch( -producerId, -producerEpoch, -verificationGuard, -currentTimeMs -); +if (!currentBatch.builder.hasRoomFor(estimatedSize)) { +// Otherwise, we write the current batch, allocate a new one and re-verify +// whether the records fit in it. +// If flushing fails, we don't catch the exception in order to let +// the caller fail the current operation. +flushCurrentBatch(); +maybeAllocateNewBatch( +producerId, +producerEpoch, +verificationGuard, +currentTimeMs +); +} } -// Add the event to the list of pending events associated with the batch. -currentBatch.deferredEvents.add(event); - try { -// Apply record to the state machine. -if (replay) { -for (int i = 0; i < records.size(); i++) { -// We compute the offset of the record based on the last written offset. The -// coordinator is the single writer to the underlying partition so we can -// deduce it like this. +for (int i = 0; i < records.size(); i++) { +U recordToReplay = records.get(i); +SimpleRecord recordToAppend = recordsToAppend.get(i); + +if (replay) { coordinator.replay( -currentBatch.nextOffset + i, +currentBatch.nextOffset, producerId, producerEpoch, -records.get(i) +recordToReplay ); } -} -// Append to the batch. -for (SimpleRecord record : recordsToApp