Hi Chris,

I am not sure if you are able to see the images I shared with you .
Copying the code snippet below,

 if (expectedRecordCount >= 0) {
            int missingCount = expectedRecordCount - (int) this.recordOffset()
- 1;
            if (missingCount > 0) {
              if (transactionContext != null) {
                isMissedRecords = true;
              } else {
                throw new DataException(String.format("Missing %d records
(expecting %d, actual %d)", missingCount, expectedRecordCount, this.
recordOffset()));
              }
            } else if (missingCount < 0) {
              if (transactionContext != null) {
                isMissedRecords = true;
              } else {
                throw new DataException(String.format("Too many records
(expecting %d, actual %d)", expectedRecordCount, this.recordOffset()));
              }
            }
          }
          addLastRecord(records, null, value);
        }



 //asn1 or binary abort
        if((config.parseErrorThreshold != null && parseErrorCount >= config.
parseErrorThreshold
        && lastbatch && transactionContext != null) || (isMissedRecords &&
transactionContext != null && lastbatch)) {
          log.info("Transaction is aborting");
            log.info("records = {}", records);
            if (!records.isEmpty()) {
              log.info("with record");
              transactionContext.abortTransaction(records.get(records.size
()-1));
            } else {
              log.info("without record");
              transactionContext.abortTransaction();
            }

Thanks,
Nitty

On Wed, Mar 8, 2023 at 11:38 PM NITTY BENNY <nittybe...@gmail.com> wrote:

> Hi Chris,
> Sorry for the typo in my previous email.
>
> Regarding the point 2,* the task returns a batch of records from
> SourceTask::poll (and, if using*
>
>
> *the per-record API provided by the TransactionContext class, includes
> atleast one record that should trigger a transaction commit/abort in
> thatbatch)*
> What if I am using the API without passing a record? We have 2 types of
> use cases, one where on encountering an error record, we want to commit
> previous successful batches and disregard the failed record and upcoming
> batches. In this case we created the transactionContext just before reading
> the file (file is our transaction boundary).Say if I have a file with 5
> records and batch size is 2, and in my 3rd batch I have one error record
> then in that batch, I dont have a valid record to call commit or abort. But
> I want to commit all the previous batches that were successfully parsed.
> How do I do that?
>
> Second use case is where I want to abort a transaction if the record count
> doesn't match.
> Code Snippet :
> [image: image.png]
> There are no error records in this case. If you see I added the condition
> of transactionContext check to implement exactly once, without
> transaction it was just throwing the exception without calling the
> addLastRecord() method and in the catch block it just logs the message and
> return the list of records without the last record to poll().To make it
> work, I called the method addLastRecord() in this case, so it is not
> throwing the exception and list has last record as well. Then I called the
> abort, everything got aborted. Why is abort not working without adding the
> last record to the list?
> [image: image.png]
>
> Code to call abort.
>
>
>
>
> Thanks,
> Nitty
>
> On Wed, Mar 8, 2023 at 4:26 PM Chris Egerton <chr...@aiven.io.invalid>
> wrote:
>
>> Hi Nitty,
>>
>> I'm a little confused about what you mean by this part:
>>
>> > transaction is not getting completed because it is not commiting the
>> transaction offest.
>>
>> The only conditions required for a transaction to be completed when a
>> connector is defining its own transaction boundaries are:
>>
>> 1. The task requests a transaction commit/abort from the
>> TransactionContext
>> 2. The task returns a batch of records from SourceTask::poll (and, if
>> using
>> the per-record API provided by the TransactionContext class, includes at
>> least one record that should trigger a transaction commit/abort in that
>> batch)
>>
>> The Connect runtime should automatically commit source offsets to Kafka
>> whenever a transaction is completed, either by commit or abort. This is
>> because transactions should only be aborted for data that should never be
>> re-read by the connector; if there is a validation error that should be
>> handled by reconfiguring the connector, then the task should throw an
>> exception instead of aborting the transaction.
>>
>> If possible, do you think you could provide a brief code snippet
>> illustrating what your task is doing that's causing issues?
>>
>> Cheers,
>>
>> Chris (not Chrise 🙂)
>>
>> On Tue, Mar 7, 2023 at 10:17 AM NITTY BENNY <nittybe...@gmail.com> wrote:
>>
>> > Hi Chrise,
>> >
>> > Thanks for sharing the details.
>> >
>> > Regarding the use case, For Asn1 source connector we have a use case to
>> > validate number of records in the file with the number of records in the
>> > header. So currently, if validation fails we are not sending the last
>> > record to the topic. But after introducing exactly once with connector
>> > transaction boundary, I can see that if I call an abort when the
>> validation
>> > fails, transaction is not getting completed because it is not commiting
>> the
>> > transaction offest. I saw that transaction state changed to
>> CompleteAbort.
>> > So for my next transaction I am getting InvalidProducerEpochException
>> and
>> > then task stopped after that. I tried calling the abort after sending
>> last
>> > record to the topic then transaction getting completed.
>> >
>> > I dont know if I am doing anything wrong here.
>> >
>> > Please advise.
>> > Thanks,
>> > Nitty
>> >
>> > On Tue 7 Mar 2023 at 2:21 p.m., Chris Egerton <chr...@aiven.io.invalid>
>> > wrote:
>> >
>> > > Hi Nitty,
>> > >
>> > > We've recently added some documentation on implementing exactly-once
>> > source
>> > > connectors here:
>> > >
>> >
>> https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors
>> > > .
>> > > To quote a relevant passage from those docs:
>> > >
>> > > > In order for a source connector to take advantage of this support,
>> it
>> > > must be able to provide meaningful source offsets for each record
>> that it
>> > > emits, and resume consumption from the external system at the exact
>> > > position corresponding to any of those offsets without dropping or
>> > > duplicating messages.
>> > >
>> > > So, as long as your source connector is able to use the Kafka Connect
>> > > framework's offsets API correctly, it shouldn't be necessary to make
>> any
>> > > other code changes to the connector.
>> > >
>> > > To enable exactly-once support for source connectors on your Connect
>> > > cluster, see the docs section here:
>> > > https://kafka.apache.org/documentation/#connect_exactlyoncesource
>> > >
>> > > With regard to transactions, a transactional producer is always
>> created
>> > > automatically for your connector by the Connect runtime when
>> exactly-once
>> > > support is enabled on the worker. The only reason to set
>> > > "transaction.boundary" to "connector" is if your connector would like
>> to
>> > > explicitly define its own transaction boundaries. In this case, it
>> sounds
>> > > like may be what you want; I just want to make sure to call out that
>> in
>> > > either case, you should not be directly instantiating a producer in
>> your
>> > > connector code, but let the Kafka Connect runtime do that for you, and
>> > just
>> > > worry about returning the right records from SourceTask::poll (and
>> > possibly
>> > > defining custom transactions using the TransactionContext API).
>> > >
>> > > With respect to your question about committing or aborting in certain
>> > > circumstances, it'd be useful to know more about your use case, since
>> it
>> > > may not be necessary to define custom transaction boundaries in your
>> > > connector at all.
>> > >
>> > > Cheers,
>> > >
>> > > Chris
>> > >
>> > >
>> > >
>> > > On Tue, Mar 7, 2023 at 7:21 AM NITTY BENNY <nittybe...@gmail.com>
>> wrote:
>> > >
>> > > > Hi Team,
>> > > >
>> > > > Adding on top of this, I tried creating a TransactionContext object
>> and
>> > > > calling the commitTransaction and abortTranaction methods in source
>> > > > connectors.
>> > > > But the main problem I saw is that if there is any error while
>> parsing
>> > > the
>> > > > record, connect is calling an abort but we have a use case to call
>> > commit
>> > > > in some cases. Is it a valid use case in terms of kafka connect?
>> > > >
>> > > > Another Question - Should I use a transactional producer instead
>> > > > creating an object of TransactionContext? Below is the connector
>> > > > configuration I am using.
>> > > >
>> > > >   exactly.once.support: "required"
>> > > >   transaction.boundary: "connector"
>> > > >
>> > > > Could you please help me here?
>> > > >
>> > > > Thanks,
>> > > > Nitty
>> > > >
>> > > > On Tue, Mar 7, 2023 at 12:29 AM NITTY BENNY <nittybe...@gmail.com>
>> > > wrote:
>> > > >
>> > > > > Hi Team,
>> > > > > I am trying to implement exactly once behavior in our source
>> > connector.
>> > > > Is
>> > > > > there any sample source connector implementation available to
>> have a
>> > > look
>> > > > > at?
>> > > > > Regards,
>> > > > > Nitty
>> > > > >
>> > > >
>> > >
>> >
>>
>

Reply via email to