Hi Nitty,

We've recently added some documentation on implementing exactly-once source
connectors here:
https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors.
To quote a relevant passage from those docs:

> In order for a source connector to take advantage of this support, it
must be able to provide meaningful source offsets for each record that it
emits, and resume consumption from the external system at the exact
position corresponding to any of those offsets without dropping or
duplicating messages.

So, as long as your source connector is able to use the Kafka Connect
framework's offsets API correctly, it shouldn't be necessary to make any
other code changes to the connector.

To enable exactly-once support for source connectors on your Connect
cluster, see the docs section here:
https://kafka.apache.org/documentation/#connect_exactlyoncesource

With regard to transactions, a transactional producer is always created
automatically for your connector by the Connect runtime when exactly-once
support is enabled on the worker. The only reason to set
"transaction.boundary" to "connector" is if your connector would like to
explicitly define its own transaction boundaries. In this case, it sounds
like may be what you want; I just want to make sure to call out that in
either case, you should not be directly instantiating a producer in your
connector code, but let the Kafka Connect runtime do that for you, and just
worry about returning the right records from SourceTask::poll (and possibly
defining custom transactions using the TransactionContext API).

With respect to your question about committing or aborting in certain
circumstances, it'd be useful to know more about your use case, since it
may not be necessary to define custom transaction boundaries in your
connector at all.

Cheers,

Chris



On Tue, Mar 7, 2023 at 7:21 AM NITTY BENNY <nittybe...@gmail.com> wrote:

> Hi Team,
>
> Adding on top of this, I tried creating a TransactionContext object and
> calling the commitTransaction and abortTranaction methods in source
> connectors.
> But the main problem I saw is that if there is any error while parsing the
> record, connect is calling an abort but we have a use case to call commit
> in some cases. Is it a valid use case in terms of kafka connect?
>
> Another Question - Should I use a transactional producer instead
> creating an object of TransactionContext? Below is the connector
> configuration I am using.
>
>   exactly.once.support: "required"
>   transaction.boundary: "connector"
>
> Could you please help me here?
>
> Thanks,
> Nitty
>
> On Tue, Mar 7, 2023 at 12:29 AM NITTY BENNY <nittybe...@gmail.com> wrote:
>
> > Hi Team,
> > I am trying to implement exactly once behavior in our source connector.
> Is
> > there any sample source connector implementation available to have a look
> > at?
> > Regards,
> > Nitty
> >
>

Reply via email to