Hi Chris,
Sorry for the typo in my previous email.

Regarding the point 2,* the task returns a batch of records from
SourceTask::poll (and, if using*


*the per-record API provided by the TransactionContext class, includes
atleast one record that should trigger a transaction commit/abort in
thatbatch)*
What if I am using the API without passing a record? We have 2 types of use
cases, one where on encountering an error record, we want to commit
previous successful batches and disregard the failed record and upcoming
batches. In this case we created the transactionContext just before reading
the file (file is our transaction boundary).Say if I have a file with 5
records and batch size is 2, and in my 3rd batch I have one error record
then in that batch, I dont have a valid record to call commit or abort. But
I want to commit all the previous batches that were successfully parsed.
How do I do that?

Second use case is where I want to abort a transaction if the record count
doesn't match.
Code Snippet :
[image: image.png]
There are no error records in this case. If you see I added the condition
of transactionContext check to implement exactly once, without
transaction it was just throwing the exception without calling the
addLastRecord() method and in the catch block it just logs the message and
return the list of records without the last record to poll().To make it
work, I called the method addLastRecord() in this case, so it is not
throwing the exception and list has last record as well. Then I called the
abort, everything got aborted. Why is abort not working without adding the
last record to the list?
[image: image.png]

Code to call abort.




Thanks,
Nitty

On Wed, Mar 8, 2023 at 4:26 PM Chris Egerton <chr...@aiven.io.invalid>
wrote:

> Hi Nitty,
>
> I'm a little confused about what you mean by this part:
>
> > transaction is not getting completed because it is not commiting the
> transaction offest.
>
> The only conditions required for a transaction to be completed when a
> connector is defining its own transaction boundaries are:
>
> 1. The task requests a transaction commit/abort from the TransactionContext
> 2. The task returns a batch of records from SourceTask::poll (and, if using
> the per-record API provided by the TransactionContext class, includes at
> least one record that should trigger a transaction commit/abort in that
> batch)
>
> The Connect runtime should automatically commit source offsets to Kafka
> whenever a transaction is completed, either by commit or abort. This is
> because transactions should only be aborted for data that should never be
> re-read by the connector; if there is a validation error that should be
> handled by reconfiguring the connector, then the task should throw an
> exception instead of aborting the transaction.
>
> If possible, do you think you could provide a brief code snippet
> illustrating what your task is doing that's causing issues?
>
> Cheers,
>
> Chris (not Chrise 🙂)
>
> On Tue, Mar 7, 2023 at 10:17 AM NITTY BENNY <nittybe...@gmail.com> wrote:
>
> > Hi Chrise,
> >
> > Thanks for sharing the details.
> >
> > Regarding the use case, For Asn1 source connector we have a use case to
> > validate number of records in the file with the number of records in the
> > header. So currently, if validation fails we are not sending the last
> > record to the topic. But after introducing exactly once with connector
> > transaction boundary, I can see that if I call an abort when the
> validation
> > fails, transaction is not getting completed because it is not commiting
> the
> > transaction offest. I saw that transaction state changed to
> CompleteAbort.
> > So for my next transaction I am getting InvalidProducerEpochException and
> > then task stopped after that. I tried calling the abort after sending
> last
> > record to the topic then transaction getting completed.
> >
> > I dont know if I am doing anything wrong here.
> >
> > Please advise.
> > Thanks,
> > Nitty
> >
> > On Tue 7 Mar 2023 at 2:21 p.m., Chris Egerton <chr...@aiven.io.invalid>
> > wrote:
> >
> > > Hi Nitty,
> > >
> > > We've recently added some documentation on implementing exactly-once
> > source
> > > connectors here:
> > >
> >
> https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors
> > > .
> > > To quote a relevant passage from those docs:
> > >
> > > > In order for a source connector to take advantage of this support, it
> > > must be able to provide meaningful source offsets for each record that
> it
> > > emits, and resume consumption from the external system at the exact
> > > position corresponding to any of those offsets without dropping or
> > > duplicating messages.
> > >
> > > So, as long as your source connector is able to use the Kafka Connect
> > > framework's offsets API correctly, it shouldn't be necessary to make
> any
> > > other code changes to the connector.
> > >
> > > To enable exactly-once support for source connectors on your Connect
> > > cluster, see the docs section here:
> > > https://kafka.apache.org/documentation/#connect_exactlyoncesource
> > >
> > > With regard to transactions, a transactional producer is always created
> > > automatically for your connector by the Connect runtime when
> exactly-once
> > > support is enabled on the worker. The only reason to set
> > > "transaction.boundary" to "connector" is if your connector would like
> to
> > > explicitly define its own transaction boundaries. In this case, it
> sounds
> > > like may be what you want; I just want to make sure to call out that in
> > > either case, you should not be directly instantiating a producer in
> your
> > > connector code, but let the Kafka Connect runtime do that for you, and
> > just
> > > worry about returning the right records from SourceTask::poll (and
> > possibly
> > > defining custom transactions using the TransactionContext API).
> > >
> > > With respect to your question about committing or aborting in certain
> > > circumstances, it'd be useful to know more about your use case, since
> it
> > > may not be necessary to define custom transaction boundaries in your
> > > connector at all.
> > >
> > > Cheers,
> > >
> > > Chris
> > >
> > >
> > >
> > > On Tue, Mar 7, 2023 at 7:21 AM NITTY BENNY <nittybe...@gmail.com>
> wrote:
> > >
> > > > Hi Team,
> > > >
> > > > Adding on top of this, I tried creating a TransactionContext object
> and
> > > > calling the commitTransaction and abortTranaction methods in source
> > > > connectors.
> > > > But the main problem I saw is that if there is any error while
> parsing
> > > the
> > > > record, connect is calling an abort but we have a use case to call
> > commit
> > > > in some cases. Is it a valid use case in terms of kafka connect?
> > > >
> > > > Another Question - Should I use a transactional producer instead
> > > > creating an object of TransactionContext? Below is the connector
> > > > configuration I am using.
> > > >
> > > >   exactly.once.support: "required"
> > > >   transaction.boundary: "connector"
> > > >
> > > > Could you please help me here?
> > > >
> > > > Thanks,
> > > > Nitty
> > > >
> > > > On Tue, Mar 7, 2023 at 12:29 AM NITTY BENNY <nittybe...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Team,
> > > > > I am trying to implement exactly once behavior in our source
> > connector.
> > > > Is
> > > > > there any sample source connector implementation available to have
> a
> > > look
> > > > > at?
> > > > > Regards,
> > > > > Nitty
> > > > >
> > > >
> > >
> >
>

Reply via email to