This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 88a640286fc MINOR: Update kafka-console-share-consumer to pass in
delivery count. (#19447)
88a640286fc is described below
commit 88a640286fcf5675d1ea48fcc4065d262b1042ac
Author: Shivsundar R <[email protected]>
AuthorDate: Fri Apr 11 10:28:56 2025 -0400
MINOR: Update kafka-console-share-consumer to pass in delivery count.
(#19447)
There was a bug in `ConsoleShareConsumer` where the delivery count
received in the `ShareFetchResponse` was not sent to the formatter to be
shown to the user. PR fixes the bug by passing in the delivery count in
a different constructor for `ConsumerRecord`.
Reviewers: Andrew Schofield <[email protected]>
---
.../kafka/tools/consumer/ConsoleShareConsumer.java | 2 +-
.../tools/consumer/ConsoleShareConsumerTest.java | 44 ++++++++++++++++++++++
2 files changed, 45 insertions(+), 1 deletion(-)
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumer.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumer.java
index 1aaa7af5b84..699397f9a70 100644
---
a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumer.java
+++
b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumer.java
@@ -111,7 +111,7 @@ public class ConsoleShareConsumer {
messageCount += 1;
try {
formatter.writeTo(new ConsumerRecord<>(msg.topic(),
msg.partition(), msg.offset(), msg.timestamp(), msg.timestampType(),
- 0, 0, msg.key(), msg.value(), msg.headers(),
Optional.empty()), output);
+ 0, 0, msg.key(), msg.value(), msg.headers(),
Optional.empty(), msg.deliveryCount()), output);
consumer.acknowledge(msg, acknowledgeType);
} catch (Throwable t) {
if (rejectMessageOnError) {
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleShareConsumerTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleShareConsumerTest.java
index 5e3f7a16db5..5bcdb15235d 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleShareConsumerTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleShareConsumerTest.java
@@ -22,15 +22,21 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ShareConsumer;
import org.apache.kafka.common.MessageFormatter;
import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.util.MockTime;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
import java.io.PrintStream;
import java.time.Duration;
+import java.util.Optional;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
@@ -131,4 +137,42 @@ public class ConsoleShareConsumerTest {
consumer.cleanup();
}
+
+ @Test
+ public void shouldUpgradeDeliveryCount() {
+ // Mock dependencies
+ ConsoleShareConsumer.ConsumerWrapper consumer =
mock(ConsoleShareConsumer.ConsumerWrapper.class);
+ MessageFormatter formatter = mock(MessageFormatter.class);
+ PrintStream printStream = mock(PrintStream.class);
+
+ short deliveryCount = 1;
+ // Mock a ConsumerRecord with a delivery count
+ ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(
+ "test-topic", 0, 0, RecordBatch.NO_TIMESTAMP,
TimestampType.NO_TIMESTAMP_TYPE, 0,
+ 0, new byte[0], new byte[0], new RecordHeaders(),
Optional.empty(), Optional.of(deliveryCount)
+ );
+
+ // Mock consumer behavior
+ when(consumer.receive()).thenReturn(record);
+
+ // Process the record
+ ConsoleShareConsumer.process(1, formatter, consumer, printStream,
false, AcknowledgeType.ACCEPT);
+
+ // Capture the actual ConsumerRecord passed to formatter.writeTo
+ ArgumentCaptor<ConsumerRecord> captor =
ArgumentCaptor.forClass(ConsumerRecord.class);
+ verify(formatter).writeTo(captor.capture(), eq(printStream));
+
+ // Assert that the captured ConsumerRecord matches the expected values
+ ConsumerRecord<byte[], byte[]> capturedRecord = captor.getValue();
+ assertEquals("test-topic", capturedRecord.topic());
+ assertEquals(0, capturedRecord.partition());
+ assertEquals(0, capturedRecord.offset());
+ assertEquals(deliveryCount,
capturedRecord.deliveryCount().orElse((short) 0));
+
+ // Verify that the consumer acknowledges the record
+ verify(consumer).acknowledge(record, AcknowledgeType.ACCEPT);
+
+ // Cleanup
+ consumer.cleanup();
+ }
}