sandeep-mst opened a new pull request, #24:
URL: https://github.com/apache/pulsar-connectors/pull/24

   <!-- Either this PR fixes an issue, -->
   
   Fixes #23
   
   ### Motivation
   
   <!-- Explain here the context, and why you're making that change. What is 
the problem you're trying to solve. -->
   
[PulsarKafkaSinkTaskContext.currentOffsets()](https://github.com/apache/pulsar-connectors/blob/b8760955f20db284c7779e38ca993ecedb5e563c/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java#L126)
 was filtering out offsets equal to `0` by only including values greater than 
`0`. This caused the first record in a partition, whose offset can legitimately 
be `0`, to be omitted from the offsets snapshot returned to 
[KafkaConnectSink#flush()](https://github.com/apache/pulsar-connectors/blob/b8760955f20db284c7779e38ca993ecedb5e563c/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java#L242).
 As a result, the acknowledgment for that record was not flushed until a later 
record with a non-zero offset was processed.
   
   This change ensures that offset `0` is treated as a valid offset and is 
included in the current offsets map, so the first record can be acknowledged 
without waiting for another record.
   
   ### Modifications
   
   <!-- Describe the modifications you've done. -->
   The offset filter in PulsarKafkaSinkTaskContext.currentOffsets() was changed 
from:
   `if (offset > 0) {`
   to:
   `if (offset >= 0) {`
   
   This includes records with offset `0` in the returned snapshot.
   A assertion was added to existing offsetTest in KafkaConnectSinkTest to 
verify that currentOffsets() contains that record and reports offset 0 
correctly.
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   
   This change added tests and can be verified as follows:
     - *org.apache.pulsar.io.kafka.connect.KafkaConnectSinkTest#offsetTest* 
(updated existing test)
   
   ### Does this pull request potentially affect one of the following parts:
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   *If the box was checked, please highlight the changes*
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The threading model
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] The metrics
   - [ ] Anything that affects deployment
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc` <!-- Your PR contains doc changes. -->
   - [ ] `doc-required` <!-- Your PR changes impact docs and you will update 
later -->
   - [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   - [ ] `doc-complete` <!-- Docs have been already added -->
   
   ### Matching PR in forked repository
   
   PR in forked repository: <!-- ENTER URL HERE -->
   https://github.com/cognitree/pulsar-connectors/pull/6
   <!--
   After opening this PR, the build in apache/pulsar will fail and instructions 
will
   be provided for opening a PR in the PR author's forked repository.
   
   apache/pulsar pull requests should be first tested in your own fork since 
the 
   apache/pulsar CI based on GitHub Actions has constrained resources and quota.
   GitHub Actions provides separate quota for pull requests that are executed 
in 
   a forked repository.
   
   The tests will be run in the forked repository until all PR review comments 
have
   been handled, the tests pass and the PR is approved by a reviewer.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to