This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 88a640286fc MINOR: Update kafka-console-share-consumer to pass in 
delivery count. (#19447)
88a640286fc is described below

commit 88a640286fcf5675d1ea48fcc4065d262b1042ac
Author: Shivsundar R <[email protected]>
AuthorDate: Fri Apr 11 10:28:56 2025 -0400

    MINOR: Update kafka-console-share-consumer to pass in delivery count. 
(#19447)
    
    There was a bug in `ConsoleShareConsumer` where the delivery count
    received in the `ShareFetchResponse` was not sent to the formatter to be
    shown to the user. PR fixes the bug by passing in the delivery count in
    a different constructor for `ConsumerRecord`.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../kafka/tools/consumer/ConsoleShareConsumer.java |  2 +-
 .../tools/consumer/ConsoleShareConsumerTest.java   | 44 ++++++++++++++++++++++
 2 files changed, 45 insertions(+), 1 deletion(-)

diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumer.java 
b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumer.java
index 1aaa7af5b84..699397f9a70 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumer.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumer.java
@@ -111,7 +111,7 @@ public class ConsoleShareConsumer {
             messageCount += 1;
             try {
                 formatter.writeTo(new ConsumerRecord<>(msg.topic(), 
msg.partition(), msg.offset(), msg.timestamp(), msg.timestampType(),
-                        0, 0, msg.key(), msg.value(), msg.headers(), 
Optional.empty()), output);
+                        0, 0, msg.key(), msg.value(), msg.headers(), 
Optional.empty(), msg.deliveryCount()), output);
                 consumer.acknowledge(msg, acknowledgeType);
             } catch (Throwable t) {
                 if (rejectMessageOnError) {
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleShareConsumerTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleShareConsumerTest.java
index 5e3f7a16db5..5bcdb15235d 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleShareConsumerTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleShareConsumerTest.java
@@ -22,15 +22,21 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.ShareConsumer;
 import org.apache.kafka.common.MessageFormatter;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.server.util.MockTime;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
 
 import java.io.PrintStream;
 import java.time.Duration;
+import java.util.Optional;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
@@ -131,4 +137,42 @@ public class ConsoleShareConsumerTest {
 
         consumer.cleanup();
     }
+
+    @Test
+    public void shouldUpgradeDeliveryCount() {
+        // Mock dependencies
+        ConsoleShareConsumer.ConsumerWrapper consumer = 
mock(ConsoleShareConsumer.ConsumerWrapper.class);
+        MessageFormatter formatter = mock(MessageFormatter.class);
+        PrintStream printStream = mock(PrintStream.class);
+
+        short deliveryCount = 1;
+        // Mock a ConsumerRecord with a delivery count
+        ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(
+                "test-topic", 0, 0, RecordBatch.NO_TIMESTAMP, 
TimestampType.NO_TIMESTAMP_TYPE, 0,
+                0, new byte[0], new byte[0], new RecordHeaders(), 
Optional.empty(), Optional.of(deliveryCount)
+        );
+
+        // Mock consumer behavior
+        when(consumer.receive()).thenReturn(record);
+
+        // Process the record
+        ConsoleShareConsumer.process(1, formatter, consumer, printStream, 
false, AcknowledgeType.ACCEPT);
+
+        // Capture the actual ConsumerRecord passed to formatter.writeTo
+        ArgumentCaptor<ConsumerRecord> captor = 
ArgumentCaptor.forClass(ConsumerRecord.class);
+        verify(formatter).writeTo(captor.capture(), eq(printStream));
+
+        // Assert that the captured ConsumerRecord matches the expected values
+        ConsumerRecord<byte[], byte[]> capturedRecord = captor.getValue();
+        assertEquals("test-topic", capturedRecord.topic());
+        assertEquals(0, capturedRecord.partition());
+        assertEquals(0, capturedRecord.offset());
+        assertEquals(deliveryCount, 
capturedRecord.deliveryCount().orElse((short) 0));
+
+        // Verify that the consumer acknowledges the record
+        verify(consumer).acknowledge(record, AcknowledgeType.ACCEPT);
+
+        // Cleanup
+        consumer.cleanup();
+    }
 }

Reply via email to