This is an automated email from the ASF dual-hosted git repository.
dlg99 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-connectors.git
The following commit(s) were added to refs/heads/master by this push:
new f172336f [fix][io] Fix delayed acknowledgment for offset 0 in Kafka
Connect Adapter (#24)
f172336f is described below
commit f172336f3576b2dd3446a76d99ba84efb85f4f25
Author: Malla Sandeep <[email protected]>
AuthorDate: Tue May 12 02:53:40 2026 +0530
[fix][io] Fix delayed acknowledgment for offset 0 in Kafka Connect Adapter
(#24)
* offsets check is updated and test is added
* updated the test for checking the offset value returned
---
.../apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java | 2 +-
.../java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java | 4 ++++
2 files changed, 5 insertions(+), 1 deletion(-)
diff --git
a/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
b/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
index 76ff8c0c..8567c502 100644
---
a/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
+++
b/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java
@@ -126,7 +126,7 @@ public class PulsarKafkaSinkTaskContext implements
SinkTaskContext {
public Map<TopicPartition, OffsetAndMetadata> currentOffsets() {
Map<TopicPartition, OffsetAndMetadata> snapshot =
Maps.newHashMapWithExpectedSize(currentOffsets.size());
currentOffsets.forEach((topicPartition, offset) -> {
- if (offset > 0) {
+ if (offset >= 0) {
snapshot.put(topicPartition,
new OffsetAndMetadata(offset, Optional.empty(), null));
}
diff --git
a/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
b/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 74af038b..a7e45de7 100644
---
a/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++
b/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -1207,6 +1207,10 @@ public class KafkaConnectSinkTest extends
ProducerConsumerBase {
// offset is 0 for the first written record
assertEquals(sink.currentOffset(topicName, partition), 0);
+ // current offsets map returned by the PulsarKafkaSinkTaskContext
should contain the record with offset 0
+ assertEquals(
+ sink.taskContext.currentOffsets().get(new
TopicPartition(topicName, partition)).offset(), 0
+ );
entryId.set(1);
sink.write(record);