This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 95947d2f581 KAFKA-17299: add unit tests for previous fix (#17919)
95947d2f581 is described below
commit 95947d2f581cce85c25ef9eb3501dc13d09ae05a
Author: Matthias J. Sax <[email protected]>
AuthorDate: Mon Nov 25 12:03:57 2024 -0800
KAFKA-17299: add unit tests for previous fix (#17919)
https://github.com/apache/kafka/pull/17899 fixed the issue, but did not
add any unit tests.
Reviewers: Bill Bejeck <[email protected]>
---
.../processor/internals/PartitionGroup.java | 3 +-
.../processor/internals/PartitionGroupTest.java | 82 ++++++++++++++++++
.../processor/internals/StreamTaskTest.java | 96 ++++++++++++++++++----
3 files changed, 164 insertions(+), 17 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index 5e57efb9628..5fb313ff605 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -250,10 +250,11 @@ class PartitionGroup extends AbstractPartitionGroup {
if (queue != null) {
// get the first record from this queue.
+ final int oldSize = queue.size();
record = queue.poll(wallClockTime);
if (record != null) {
- --totalBuffered;
+ totalBuffered -= oldSize - queue.size();
if (queue.isEmpty()) {
// if a certain queue has been drained, reset the flag
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index 1f4fec19484..95a5210ae34 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -606,6 +606,88 @@ public class PartitionGroupTest {
assertThat(group.nextRecord(new RecordInfo(), time.milliseconds()),
nullValue()); // all available records removed
}
+ @Test
+ public void shouldUpdateBufferSizeCorrectlyForSkippedRecords() {
+ final PartitionGroup group = new PartitionGroup(
+ logContext,
+ mkMap(mkEntry(partition1, queue1)),
+ tp -> OptionalLong.of(0L),
+ getValueSensor(metrics, lastLatenessValue),
+ enforcedProcessingSensor,
+ maxTaskIdleMs
+ );
+ final List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
+ new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue),
+ new ConsumerRecord<>("topic", 1, 5L, recordKey, recordValue),
+ new ConsumerRecord<>(
+ "topic",
+ 1,
+ -1, // offset as invalid timestamp
+ -1, // invalid timestamp
+ TimestampType.CREATE_TIME,
+ 0,
+ 0,
+ recordKey,
+ recordValue,
+ new RecordHeaders(),
+ Optional.empty()
+ ),
+ new ConsumerRecord<>(
+ "topic",
+ 1,
+ 11,
+ 0,
+ TimestampType.CREATE_TIME,
+ 0,
+ 0,
+ new byte[0], // corrupted key
+ recordValue,
+ new RecordHeaders(),
+ Optional.empty()
+ ),
+ new ConsumerRecord<>(
+ "topic",
+ 1,
+ -1, // offset as invalid timestamp
+ -1, // invalid timestamp
+ TimestampType.CREATE_TIME,
+ 0,
+ 0,
+ recordKey,
+ recordValue,
+ new RecordHeaders(),
+ Optional.empty()
+ ),
+ new ConsumerRecord<>(
+ "topic",
+ 1,
+ 13,
+ 0,
+ TimestampType.CREATE_TIME,
+ 0,
+ 0,
+ recordKey,
+ new byte[0], // corrupted value
+ new RecordHeaders(),
+ Optional.empty()
+ ),
+ new ConsumerRecord<>("topic", 1, 20L, recordKey, recordValue)
+ );
+
+ group.addRawRecords(partition1, list1);
+ assertEquals(7, group.numBuffered());
+
+ group.nextRecord(new RecordInfo(), time.milliseconds());
+ assertEquals(6, group.numBuffered());
+
+ // drain corrupted records
+ group.nextRecord(new RecordInfo(), time.milliseconds());
+ assertEquals(1, group.numBuffered());
+
+ group.nextRecord(new RecordInfo(), time.milliseconds());
+ assertEquals(0, group.numBuffered());
+ }
+
@Test
public void shouldNeverWaitIfIdlingIsDisabled() {
final PartitionGroup group = new PartitionGroup(
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 8555e4d065b..5dab5329026 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -157,8 +157,8 @@ public class StreamTaskTest {
private final LogContext logContext = new LogContext("[test] ");
private final String topic1 = "topic1";
private final String topic2 = "topic2";
- private final TopicPartition partition1 = new TopicPartition(topic1, 1);
- private final TopicPartition partition2 = new TopicPartition(topic2, 1);
+ private final TopicPartition partition1 = new TopicPartition(topic1, 0);
+ private final TopicPartition partition2 = new TopicPartition(topic2, 0);
private final Set<TopicPartition> partitions = new
HashSet<>(List.of(partition1, partition2));
private final Serializer<Integer> intSerializer = new IntegerSerializer();
private final Deserializer<Integer> intDeserializer = new
IntegerDeserializer();
@@ -1082,6 +1082,70 @@ public class StreamTaskTest {
assertEquals(0, consumer.paused().size());
}
+ @Test
+ public void shouldResumePartitionWhenSkippingOverRecordsWithInvalidTs() {
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
+ task = createStatelessTask(createConfig(
+ StreamsConfig.AT_LEAST_ONCE,
+ "-1",
+ LogAndContinueExceptionHandler.class,
+ LogAndFailProcessingExceptionHandler.class,
+ LogAndSkipOnInvalidTimestamp.class
+ ));
+ task.initializeIfNeeded();
+ task.completeRestoration(noOpResetter -> { });
+
+ task.addRecords(partition1, asList(
+ getConsumerRecordWithOffsetAsTimestamp(partition1, 10),
+ getConsumerRecordWithOffsetAsTimestamp(partition1, 20),
+ getConsumerRecordWithInvalidTimestamp(30),
+ getConsumerRecordWithInvalidTimestamp(40),
+ getConsumerRecordWithInvalidTimestamp(50)
+ ));
+ assertTrue(consumer.paused().contains(partition1));
+
+ assertTrue(task.process(0L));
+
+ task.resumePollingForPartitionsWithAvailableSpace();
+ assertTrue(consumer.paused().contains(partition1));
+
+ assertTrue(task.process(0L));
+
+ task.resumePollingForPartitionsWithAvailableSpace();
+ assertEquals(0, consumer.paused().size());
+
+ assertTrue(task.process(0L)); // drain head record (ie, last invalid
record)
+ assertFalse(task.process(0L));
+ assertFalse(task.hasRecordsQueued());
+
+
+ // repeat test for deserialization error
+ task.resumePollingForPartitionsWithAvailableSpace();
+ task.addRecords(partition1, asList(
+ getConsumerRecordWithOffsetAsTimestamp(partition1, 110),
+ getConsumerRecordWithOffsetAsTimestamp(partition1, 120),
+ getCorruptedConsumerRecordWithOffsetAsTimestamp(130),
+ getCorruptedConsumerRecordWithOffsetAsTimestamp(140),
+ getCorruptedConsumerRecordWithOffsetAsTimestamp(150)
+ ));
+ assertTrue(consumer.paused().contains(partition1));
+
+ assertTrue(task.process(0L));
+
+ task.resumePollingForPartitionsWithAvailableSpace();
+ assertTrue(consumer.paused().contains(partition1));
+
+ assertTrue(task.process(0L));
+
+ task.resumePollingForPartitionsWithAvailableSpace();
+ assertEquals(0, consumer.paused().size());
+
+ assertTrue(task.process(0L)); // drain head record (ie, last corrupted
record)
+ assertFalse(task.process(0L));
+ assertFalse(task.hasRecordsQueued());
+ }
+
@Test
public void shouldPunctuateOnceStreamTimeAfterGap() {
when(stateManager.taskId()).thenReturn(taskId);
@@ -3314,7 +3378,7 @@ public class StreamTaskTest {
private ConsumerRecord<byte[], byte[]>
getConsumerRecordWithOffsetAsTimestamp(final Integer key, final long offset) {
return new ConsumerRecord<>(
topic1,
- 1,
+ 0,
offset,
offset, // use the offset as the timestamp
TimestampType.CREATE_TIME,
@@ -3330,7 +3394,7 @@ public class StreamTaskTest {
private ConsumerRecord<byte[], byte[]>
getConsumerRecordWithInvalidTimestamp(final long offset) {
return new ConsumerRecord<>(
topic1,
- 1,
+ 0,
offset,
-1L, // invalid (negative) timestamp
TimestampType.CREATE_TIME,
@@ -3347,24 +3411,24 @@ public class StreamTaskTest {
final long offset,
final int leaderEpoch) {
return new ConsumerRecord<>(
- topicPartition.topic(),
- topicPartition.partition(),
- offset,
- offset, // use the offset as the timestamp
- TimestampType.CREATE_TIME,
- 0,
- 0,
- recordKey,
- recordValue,
- new RecordHeaders(),
- Optional.of(leaderEpoch)
+ topicPartition.topic(),
+ topicPartition.partition(),
+ offset,
+ offset, // use the offset as the timestamp
+ TimestampType.CREATE_TIME,
+ 0,
+ 0,
+ recordKey,
+ recordValue,
+ new RecordHeaders(),
+ Optional.of(leaderEpoch)
);
}
private ConsumerRecord<byte[], byte[]>
getCorruptedConsumerRecordWithOffsetAsTimestamp(final long offset) {
return new ConsumerRecord<>(
topic1,
- 1,
+ 0,
offset,
offset, // use the offset as the timestamp
TimestampType.CREATE_TIME,