This is an automated email from the ASF dual-hosted git repository.

dlg99 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-connectors.git


The following commit(s) were added to refs/heads/master by this push:
     new f172336f [fix][io] Fix delayed acknowledgment for offset 0 in Kafka 
Connect Adapter (#24)
f172336f is described below

commit f172336f3576b2dd3446a76d99ba84efb85f4f25
Author: Malla Sandeep <[email protected]>
AuthorDate: Tue May 12 02:53:40 2026 +0530

    [fix][io] Fix delayed acknowledgment for offset 0 in Kafka Connect Adapter 
(#24)
    
    * offsets check is updated and test is added
    * updated the test for checking the offset value returned
---
 .../apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java    | 2 +-
 .../java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java | 4 ++++
 2 files changed, 5 insertions(+), 1 deletion(-)

diff --git 
a/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
 
b/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
index 76ff8c0c..8567c502 100644
--- 
a/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
+++ 
b/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
@@ -126,7 +126,7 @@ public class PulsarKafkaSinkTaskContext implements 
SinkTaskContext {
     public Map<TopicPartition, OffsetAndMetadata> currentOffsets() {
         Map<TopicPartition, OffsetAndMetadata> snapshot = 
Maps.newHashMapWithExpectedSize(currentOffsets.size());
         currentOffsets.forEach((topicPartition, offset) -> {
-            if (offset > 0) {
+            if (offset >= 0) {
                 snapshot.put(topicPartition,
                         new OffsetAndMetadata(offset, Optional.empty(), null));
             }
diff --git 
a/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
 
b/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 74af038b..a7e45de7 100644
--- 
a/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ 
b/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -1207,6 +1207,10 @@ public class KafkaConnectSinkTest extends 
ProducerConsumerBase {
 
         // offset is 0 for the first written record
         assertEquals(sink.currentOffset(topicName, partition), 0);
+        // current offsets map returned by the PulsarKafkaSinkTaskContext 
should contain the record with offset 0
+        assertEquals(
+                sink.taskContext.currentOffsets().get(new 
TopicPartition(topicName, partition)).offset(), 0
+        );
 
         entryId.set(1);
         sink.write(record);

Reply via email to