Hi Chrise,

Thanks for sharing the details.

Regarding the use case, For Asn1 source connector we have a use case to
validate number of records in the file with the number of records in the
header. So currently, if validation fails we are not sending the last
record to the topic. But after introducing exactly once with connector
transaction boundary, I can see that if I call an abort when the validation
fails, transaction is not getting completed because it is not commiting the
transaction offest. I saw that transaction state changed to CompleteAbort.
So for my next transaction I am getting InvalidProducerEpochException and
then task stopped after that. I tried calling the abort after sending last
record to the topic then transaction getting completed.

I dont know if I am doing anything wrong here.

Please advise.
Thanks,
Nitty

On Tue 7 Mar 2023 at 2:21 p.m., Chris Egerton <chr...@aiven.io.invalid>
wrote:

> Hi Nitty,
>
> We've recently added some documentation on implementing exactly-once source
> connectors here:
> https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors
> .
> To quote a relevant passage from those docs:
>
> > In order for a source connector to take advantage of this support, it
> must be able to provide meaningful source offsets for each record that it
> emits, and resume consumption from the external system at the exact
> position corresponding to any of those offsets without dropping or
> duplicating messages.
>
> So, as long as your source connector is able to use the Kafka Connect
> framework's offsets API correctly, it shouldn't be necessary to make any
> other code changes to the connector.
>
> To enable exactly-once support for source connectors on your Connect
> cluster, see the docs section here:
> https://kafka.apache.org/documentation/#connect_exactlyoncesource
>
> With regard to transactions, a transactional producer is always created
> automatically for your connector by the Connect runtime when exactly-once
> support is enabled on the worker. The only reason to set
> "transaction.boundary" to "connector" is if your connector would like to
> explicitly define its own transaction boundaries. In this case, it sounds
> like may be what you want; I just want to make sure to call out that in
> either case, you should not be directly instantiating a producer in your
> connector code, but let the Kafka Connect runtime do that for you, and just
> worry about returning the right records from SourceTask::poll (and possibly
> defining custom transactions using the TransactionContext API).
>
> With respect to your question about committing or aborting in certain
> circumstances, it'd be useful to know more about your use case, since it
> may not be necessary to define custom transaction boundaries in your
> connector at all.
>
> Cheers,
>
> Chris
>
>
>
> On Tue, Mar 7, 2023 at 7:21 AM NITTY BENNY <nittybe...@gmail.com> wrote:
>
> > Hi Team,
> >
> > Adding on top of this, I tried creating a TransactionContext object and
> > calling the commitTransaction and abortTranaction methods in source
> > connectors.
> > But the main problem I saw is that if there is any error while parsing
> the
> > record, connect is calling an abort but we have a use case to call commit
> > in some cases. Is it a valid use case in terms of kafka connect?
> >
> > Another Question - Should I use a transactional producer instead
> > creating an object of TransactionContext? Below is the connector
> > configuration I am using.
> >
> >   exactly.once.support: "required"
> >   transaction.boundary: "connector"
> >
> > Could you please help me here?
> >
> > Thanks,
> > Nitty
> >
> > On Tue, Mar 7, 2023 at 12:29 AM NITTY BENNY <nittybe...@gmail.com>
> wrote:
> >
> > > Hi Team,
> > > I am trying to implement exactly once behavior in our source connector.
> > Is
> > > there any sample source connector implementation available to have a
> look
> > > at?
> > > Regards,
> > > Nitty
> > >
> >
>

Reply via email to