junrao commented on code in PR #20285:
URL: https://github.com/apache/kafka/pull/20285#discussion_r2407819045
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -237,6 +238,20 @@ public boolean completeExceptionally(
return done(ProduceResponse.INVALID_OFFSET, RecordBatch.NO_TIMESTAMP,
topLevelException, recordExceptions);
}
+ /**
+ * Get all record futures for this batch.
+ * This is used by flush() to wait on individual records rather than the
batch-level future.
+ * When batches are split, individual futures are chained to the new
batches,
+ * ensuring flush() waits for all split batches to complete.
Review Comment:
ensuring flush() waits => ensuring that flush() waits
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -1072,12 +1073,25 @@ private boolean appendsInProgress() {
*/
public void awaitFlushCompletion() throws InterruptedException {
try {
- // Obtain a copy of all of the incomplete ProduceRequestResult(s)
at the time of the flush.
- // We must be careful not to hold a reference to the
ProduceBatch(s) so that garbage
- // collection can occur on the contents.
- // The sender will remove ProducerBatch(s) from the original
incomplete collection.
- for (ProduceRequestResult result :
this.incomplete.requestResults())
- result.await();
+ // Obtain a snapshot of all record futures at the time of the
flush.
+ // We wait on individual record futures rather than batch-level
futures because
+ // by waiting on record futures, we ensure flush() blocks until
all split
+ // batches complete.
+ //
+ // We first collect all futures into a list first to avoid holding
references to
+ // ProducerBatch objects, allowing them to be garbage collected
after completion.
+ List<FutureRecordMetadata> futures = new ArrayList<>();
+ for (ProducerBatch batch : this.incomplete.copyAll()) {
Review Comment:
incomplete.requestResults() is no longer used?
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -1072,12 +1073,25 @@ private boolean appendsInProgress() {
*/
public void awaitFlushCompletion() throws InterruptedException {
try {
- // Obtain a copy of all of the incomplete ProduceRequestResult(s)
at the time of the flush.
- // We must be careful not to hold a reference to the
ProduceBatch(s) so that garbage
- // collection can occur on the contents.
- // The sender will remove ProducerBatch(s) from the original
incomplete collection.
- for (ProduceRequestResult result :
this.incomplete.requestResults())
- result.await();
+ // Obtain a snapshot of all record futures at the time of the
flush.
+ // We wait on individual record futures rather than batch-level
futures because
+ // by waiting on record futures, we ensure flush() blocks until
all split
+ // batches complete.
+ //
+ // We first collect all futures into a list first to avoid holding
references to
Review Comment:
first used twice in the same sentence.
##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -1066,6 +1066,146 @@ public void testSplitAndReenqueue() throws
ExecutionException, InterruptedExcept
assertEquals(1, future2.get().offset());
}
+ // This test confirms us that splitting a single large record
+ // creates an unsplittable batch (does not really split it)
+ // that will continue to fail with MESSAGE_TOO_LARGE,
+ // causing infinite retry loops
+ @Test
+ public void testSplitAndReenqueueWithSingleLargeRecord() throws
ExecutionException, InterruptedException {
+ long now = time.milliseconds();
+ int smallBatchSize = 1024;
+ RecordAccumulator accum = createTestRecordAccumulator(smallBatchSize,
10 * 1024, Compression.NONE, 10);
+
+ // create a single record that is much larger than the batch size limit
+ // we are trying to mimic by send a record larger than broker's
message.max.bytes
+ byte[] largeValue = new byte[4 * 1024]; // 4KB > 1KB
+
+ // Create a buffer with enough space for the large record
+ ByteBuffer buffer = ByteBuffer.allocate(8192);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer,
Compression.NONE, TimestampType.CREATE_TIME, 0L);
+ ProducerBatch batch = new ProducerBatch(tp1, builder, now, true);
+
+ final AtomicInteger acked = new AtomicInteger(0);
+ Callback cb = (metadata, exception) -> acked.incrementAndGet();
+
+ // create a large batch but only with one single record
+ Future<RecordMetadata> future = batch.tryAppend(now, key, largeValue,
Record.EMPTY_HEADERS, cb, now);
+ assertNotNull(future, "Should be able to append the large record to
batch");
+ assertEquals(1, batch.recordCount, "Batch should contain exactly one
record");
+ batch.close();
+
+ // try to split and reenqueue a single large record
+ SplitAndReenqueueResult result = performSplitAndReenqueueCycle(accum,
batch, 0);
+
+ // The below asserts tests that the single large record
+ // results in exactly one "split" batch
+ assertEquals(1, result.numSplitBatches, "Single large record should
result in exactly one split batch");
+ assertEquals(1, result.originalRecordCount, "Original batch should
have exactly one record");
+ assertEquals(1, result.splitBatch.recordCount, "Split batch should
still contain exactly one record");
+ assertTrue(result.originalBatchSize > smallBatchSize, "Original batch
should exceed batch size limit");
+
+ // the "split" batch is still oversized and contains the same record
+ assertTrue(result.splitBatch.estimatedSizeInBytes() > smallBatchSize,
+ "Split batch is still oversized - it cannot be split further and
will cause an error, will retry infinitely");
+ }
+
+ // This test retries for infinite times (controlled for 5 times for
testing)
+ // because the record can never be split further
+ @Test
+ public void testRetrySplitAndReenqueueBehaviourWithSingleLargeRecord()
throws ExecutionException, InterruptedException {
+ long now = time.milliseconds();
+ int smallBatchSize = 1024;
+ RecordAccumulator accum = createTestRecordAccumulator(smallBatchSize,
10 * 1024, Compression.NONE, 10);
+
+ // create a single record that is much larger than the batch size limit
+ // we are trying to mimic by send a record larger than broker's
message.max.bytes
+ byte[] largeValue = new byte[4 * 1024]; // 4KB > 1KB
+
+ // Create a buffer with enough space for the large record
+ ByteBuffer buffer = ByteBuffer.allocate(8192);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer,
Compression.NONE, TimestampType.CREATE_TIME, 0L);
+ ProducerBatch originalBatch = new ProducerBatch(tp1, builder, now,
true);
+
+ final AtomicInteger acked = new AtomicInteger(0);
+ Callback cb = (metadata, exception) -> acked.incrementAndGet();
+
+ Future<RecordMetadata> future = originalBatch.tryAppend(now, key,
largeValue, Record.EMPTY_HEADERS, cb, now);
+ assertNotNull(future, "Should be able to append the large record to
batch");
+ assertEquals(1, originalBatch.recordCount, "Original batch should
contain exactly one record");
+ originalBatch.close();
+
+ // controlled test case, retry behavior across multiple cycles
+ // 5 cycles for testing but mimics infinite retries in reality
+ final int maxRetryCycles = 5;
+
+ ProducerBatch currentBatch = originalBatch;
+ List<SplitAndReenqueueResult> results = new ArrayList<>();
+
+ for (int retryAttempt = 0; retryAttempt < maxRetryCycles;
retryAttempt++) {
+ SplitAndReenqueueResult result =
performSplitAndReenqueueCycle(accum, currentBatch, retryAttempt);
+ results.add(result);
+
+ // Verify that each retry produces exactly 1 "split" batch (cannot
be split further)
+ assertEquals(1, result.numSplitBatches, "Single record should
result in exactly one split batch in retry attempt " + retryAttempt);
+ assertEquals(1, result.originalRecordCount, "Original batch should
have exactly one record in retry attempt " + retryAttempt);
+ assertTrue(result.originalBatchSize > smallBatchSize, "Original
batch should exceed size limit in retry attempt " + retryAttempt);
+ assertEquals(1, result.splitBatch.recordCount, "Split batch should
still contain exactly one record in retry attempt " + retryAttempt);
+
+ // The split batch is still oversized and will fail with
MESSAGE_TOO_LARGE again
+ assertTrue(result.splitBatch.estimatedSizeInBytes() >
smallBatchSize,
+ "Split batch in retry " + retryAttempt + " is still oversized
and will fail MESSAGE_TOO_LARGE again");
+
+ // the new batch must be the split batch
+ currentBatch = result.splitBatch;
+ }
+
+ // making sure that all the retry attempts were tracked
+ assertEquals(maxRetryCycles, results.size(), "Should have tracked all
retry attempts");
+
+ // consistency across all retry cycles - each produces exactly 1
unsplittable batch
+ for (int i = 0; i < maxRetryCycles; i++) {
+ SplitAndReenqueueResult result = results.get(i);
+ assertEquals(1, result.numSplitBatches, "Retry attempt " + i + "
should produce exactly 1 split batch");
+ assertEquals(1, result.originalRecordCount, "Retry attempt " + i +
" should have exactly 1 record");
+ assertTrue(result.originalBatchSize > smallBatchSize, "Retry
attempt " + i + " batch should exceed size limit");
+ }
+ }
+
+ // here I am testing the hasRoomFor() behaviour
+ // It allows the first record no matter the size
+ // but does not allow the second record
Review Comment:
This test is not useful. By design, we always allow the first record on a
new batch.
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -237,6 +238,20 @@ public boolean completeExceptionally(
return done(ProduceResponse.INVALID_OFFSET, RecordBatch.NO_TIMESTAMP,
topLevelException, recordExceptions);
}
+ /**
+ * Get all record futures for this batch.
+ * This is used by flush() to wait on individual records rather than the
batch-level future.
+ * When batches are split, individual futures are chained to the new
batches,
+ * ensuring flush() waits for all split batches to complete.
+ *
+ * @return List of FutureRecordMetadata for all records in this batch
+ */
+ public List<FutureRecordMetadata> recordFutures() {
+ return thunks.stream()
+ .map(thunk -> thunk.future)
Review Comment:
The downside of this approach is the potential overhead due to record level
checking. Could you do some perf test to compare the flush() time with and w/o
this PR with say a few thousands pending records?
Another option is to also chain ProduceRequestResult when splitting the
batches. This allows us to check the status at the request level in the flush()
call.
##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##########
@@ -1062,6 +1062,146 @@ public void testSplitAndReenqueue() throws
ExecutionException, InterruptedExcept
assertEquals(1, future2.get().offset());
}
+ // This test confirms us that splitting a single large record
Review Comment:
This test is not useful since we won't call split if a batch contains only 1
record. Ditto for testRetrySplitAndReenqueueBehaviourWithSingleLargeRecord().
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -237,6 +238,20 @@ public boolean completeExceptionally(
return done(ProduceResponse.INVALID_OFFSET, RecordBatch.NO_TIMESTAMP,
topLevelException, recordExceptions);
}
+ /**
+ * Get all record futures for this batch.
+ * This is used by flush() to wait on individual records rather than the
batch-level future.
+ * When batches are split, individual futures are chained to the new
batches,
Review Comment:
individual futures => individual record futures
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AtLeastOnceDeliveryMessageLossIntegrationTest.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.internals.Sender;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.DEFAULT_TIMEOUT;
+import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
+import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
+import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
+import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
+import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
+import static org.apache.kafka.test.TestUtils.consumerConfig;
+import static org.apache.kafka.test.TestUtils.producerConfig;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@Timeout(600)
+@Tag("integration")
+public class AtLeastOnceDeliveryMessageLossIntegrationTest {
+ private static final Logger log = LoggerFactory.getLogger(
+ AtLeastOnceDeliveryMessageLossIntegrationTest.class);
+
+ private static final int NUM_BROKERS = 1;
+ private static final int LARGE_RECORD_COUNT = 50000;
+ private static final int SMALL_RECORD_COUNT = 40000;
+
+ public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS);
+
+ @BeforeAll
+ public static void startCluster() throws IOException {
+ CLUSTER.start();
+ }
+
+ @AfterAll
+ public static void closeCluster() {
+ CLUSTER.stop();
+ }
+
+ private String applicationId;
+ private String inputTopic;
+ private String outputTopic;
+ private Properties streamsConfiguration;
+ private KafkaStreams kafkaStreams;
+
+ @BeforeEach
+ public void setUp(final TestInfo testInfo) throws Exception {
+ final String testId = safeUniqueTestName(testInfo);
+ applicationId = "app-" + testId;
+ inputTopic = "input-" + testId;
+ outputTopic = "output-" + testId;
+
+ cleanStateBeforeTest(CLUSTER, inputTopic, outputTopic);
+ CLUSTER.createTopics(inputTopic, outputTopic);
+
+ setupStreamsConfiguration();
+ }
+
+ @AfterEach
+ public void cleanUp() throws Exception {
+ if (kafkaStreams != null) {
+ kafkaStreams.close();
+ }
+ if (streamsConfiguration != null) {
+ purgeLocalStreamsState(streamsConfiguration);
+ }
+ }
+
+ // failing test
+ @Test
+ public void
shouldNotCommitOffsetsAndNotProduceOutputRecordsWhenProducerFailsWithMessageTooLarge()
throws Exception {
+
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(Sender.class)) {
+ produceInputData(LARGE_RECORD_COUNT);
+
+ kafkaStreams = createStreamsApplication();
+
startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams),
Duration.ofMillis(DEFAULT_TIMEOUT));
+
+ waitForProcessingAndCommit();
+
+ // for this bug
+ // first offsets are committed, then
+ // no messages produced in output topic, then
+ // repeated retries and MESSAGE_TOO_LARGE error
+
+ assertTrue(appender.getMessages().stream()
+ .anyMatch(msg -> msg.contains("MESSAGE_TOO_LARGE") &&
msg.contains("splitting and retrying")),
+ "Should log MESSAGE_TOO_LARGE and splitting retry messages");
+
+ final int outputRecordCount = verifyOutputRecords(0); // should
not produce records
+ final boolean offsetsCommitted =
verifyConsumerOffsetsCommitted(0); // should not commit offset unless records
are produced
+
+ assertEquals(0, outputRecordCount, "Output topic should not have
any records");
+ assertTrue(offsetsCommitted, "Consumer offsets should not be
committed");
+ }
+ }
+
+ @Test
+ public void
shouldCommitOffsetsAndProduceMessagesNormallyForSmallerRecordCount() throws
Exception {
+ produceInputData(SMALL_RECORD_COUNT);
+
+ try (final KafkaStreams kafkaStreams = createStreamsApplication()) {
+
startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams),
Duration.ofMillis(DEFAULT_TIMEOUT));
+
+ waitForProcessingAndCommit();
+
+ //normal behavior
+ final int outputRecordCount =
verifyOutputRecords(SMALL_RECORD_COUNT); //should produce records
+ final boolean offsetsCommitted =
verifyConsumerOffsetsCommitted(SMALL_RECORD_COUNT); // should commit offsets
+
+ assertEquals(SMALL_RECORD_COUNT, outputRecordCount, "Output topic
should have " + SMALL_RECORD_COUNT + " records");
+ assertTrue(offsetsCommitted, "Consumer offsets should be
committed");
+ }
+ }
+
+
+ private void setupStreamsConfiguration() {
+ streamsConfiguration = new Properties();
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
applicationId);
+ streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath());
+ streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
+
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
+
+ // AT_LEAST_ONCE processing guarantee
+ streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.AT_LEAST_ONCE);
+ streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
30000L);
+
+ // Producer configuration that can trigger MESSAGE_TOO_LARGE errors
+ streamsConfiguration.put(ProducerConfig.LINGER_MS_CONFIG, 300000);
+ streamsConfiguration.put(ProducerConfig.BATCH_SIZE_CONFIG, 33554432);
+ }
+
+ private void produceInputData(final int recordCount) throws Exception {
Review Comment:
This method doesn't throw Exception.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]