Hi Chris,

Sorry Chris, I am not able to reproduce the above issue.

I want to share with you one more use case I found.
The use case is in the first batch it returns 2 valid records and then in
the next batch it is an invalid record.Below is the transaction_state
topic, when I call a commit while processing an invalid record.

connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
producerId=11, producerEpoch=2, txnTimeoutMs=60000, state=*Ongoing*,
pendingState=None, topicPartitions=HashSet(streams-input-2),
txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620463834)

then after some time I saw the below states as well,

connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
producerId=11, producerEpoch=3, txnTimeoutMs=60000, state=*PrepareAbort*,
pendingState=None, topicPartitions=HashSet(streams-input-2),
txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620526119)
connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
producerId=11, producerEpoch=3, txnTimeoutMs=60000, state=*CompleteAbort*,
pendingState=None, topicPartitions=HashSet(),
txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620526121)

Later for the next transaction, when it returns the first batch below is
the logs I can see.

 Transiting to abortable error state due to
org.apache.kafka.common.errors.InvalidProducerEpochException: Producer
attempted to produce with an old epoch.
(org.apache.kafka.clients.producer.internals.TransactionManager)
[kafka-producer-network-thread |
connector-producer-json-sftp-source-connector-0]
2023-03-12 11:32:45,220 ERROR [json-sftp-source-connector|task-0]
ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} failed to send
record to streams-input:
(org.apache.kafka.connect.runtime.AbstractWorkerSourceTask)
[kafka-producer-network-thread |
connector-producer-json-sftp-source-connector-0]
org.apache.kafka.common.errors.InvalidProducerEpochException: Producer
attempted to produce with an old epoch.
2023-03-12 11:32:45,222 INFO [json-sftp-source-connector|task-0] [Producer
clientId=connector-producer-json-sftp-source-connector-0,
transactionalId=connect-cluster-json-sftp-source-connector-0] Transiting to
fatal error state due to
org.apache.kafka.common.errors.ProducerFencedException: There is a newer
producer with the same transactionalId which fences the current one.
(org.apache.kafka.clients.producer.internals.TransactionManager)
[kafka-producer-network-thread |
connector-producer-json-sftp-source-connector-0]
2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0] [Producer
clientId=connector-producer-json-sftp-source-connector-0,
transactionalId=connect-cluster-json-sftp-source-connector-0] Aborting
producer batches due to fatal error
(org.apache.kafka.clients.producer.internals.Sender)
[kafka-producer-network-thread |
connector-producer-json-sftp-source-connector-0]
org.apache.kafka.common.errors.ProducerFencedException: There is a newer
producer with the same transactionalId which fences the current one.
2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0]
ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Failed to
flush offsets to storage:
(org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask)
[kafka-producer-network-thread |
connector-producer-json-sftp-source-connector-0]
org.apache.kafka.common.errors.ProducerFencedException: There is a newer
producer with the same transactionalId which fences the current one.
2023-03-12 11:32:45,224 ERROR [json-sftp-source-connector|task-0]
ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} failed to send
record to streams-input:
(org.apache.kafka.connect.runtime.AbstractWorkerSourceTask)
[kafka-producer-network-thread |
connector-producer-json-sftp-source-connector-0]
org.apache.kafka.common.errors.ProducerFencedException: There is a newer
producer with the same transactionalId which fences the current one.
2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0|offsets]
ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Failed to
commit producer transaction
(org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask)
[task-thread-json-sftp-source-connector-0]
org.apache.kafka.common.errors.ProducerFencedException: There is a newer
producer with the same transactionalId which fences the current one.
2023-03-12 11:32:45,225 ERROR [json-sftp-source-connector|task-0]
ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Task threw an
uncaught and unrecoverable exception. Task is being killed and will not
recover until manually restarted
(org.apache.kafka.connect.runtime.WorkerTask)
[task-thread-json-sftp-source-connector-0]

Do you know why it is showing an abort state even if I call commit?

I tested one more scenario, When I call the commit I saw the below
connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
producerId=11, producerEpoch=2, txnTimeoutMs=60000, state=*Ongoing*,
pendingState=None, topicPartitions=HashSet(streams-input-2),
txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620463834)
Then, before changing the states to Abort, I dropped the next file then I
dont see any issues. Previous transaction
as well as the current transaction are committed.

Thank you for your support.

Thanks,
Nitty

On Fri, Mar 10, 2023 at 8:04 PM Chris Egerton <chr...@aiven.io.invalid>
wrote:

> Hi Nitty,
>
> > I called commitTransaction when I reach the first error record, but
> commit is not happening for me. Kafka connect tries to abort the
> transaction automatically
>
> This is really interesting--are you certain that your task never invoked
> TransactionContext::abortTransaction in this case? I'm looking over the
> code base and it seems fairly clear that the only thing that could trigger
> a call to KafkaProducer::abortTransaction is a request by the task to abort
> a transaction (either for a next batch, or for a specific record). It may
> help to run the connector in a debugger and/or look for "Aborting
> transaction for batch as requested by connector" or "Aborting transaction
> for record on topic <TOPIC NAME HERE> as requested by connector" log
> messages (which will be emitted at INFO level by
> the org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask class if
> the task is requesting an abort).
>
> Regardless, I'll work on a fix for the bug with aborting empty
> transactions. Thanks for helping uncover that one!
>
> Cheers,
>
> Chris
>
> On Thu, Mar 9, 2023 at 6:36 PM NITTY BENNY <nittybe...@gmail.com> wrote:
>
> > Hi Chris,
> >
> > We have a use case to commit previous successful records and stop the
> > processing of the current file and move on with the next file. To achieve
> > that I called commitTransaction when I reach the first error record, but
> > commit is not happening for me. Kafka connect tries to abort the
> > transaction automatically, I checked the _transaction_state topic and
> > states marked as PrepareAbort and CompleteAbort. Do you know why kafka
> > connect automatically invokes abort instead of the implicit commit I
> > called?
> > Then as a result, when I tries to parse the next file - say ABC, I saw
> the
> > logs "Aborting incomplete transaction" and ERROR: "Failed to sent record
> to
> > topic", and we lost the first batch of records from the current
> transaction
> > in the file ABC.
> >
> > Is it possible that there's a case where an abort is being requested
> while
> > the current transaction is empty (i.e., the task hasn't returned any
> > records from SourceTask::poll since the last transaction was
> > committed/aborted)? --- Yes, that case is possible for us. There is a
> case
> > where the first record itself an error record.
> >
> > Thanks,
> > Nitty
> >
> > On Thu, Mar 9, 2023 at 3:48 PM Chris Egerton <chr...@aiven.io.invalid>
> > wrote:
> >
> > > Hi Nitty,
> > >
> > > Thanks for the code examples and the detailed explanations, this is
> > really
> > > helpful!
> > >
> > > > Say if I have a file with 5 records and batch size is 2, and in my
> 3rd
> > > batch I have one error record then in that batch, I dont have a valid
> > > record to call commit or abort. But I want to commit all the previous
> > > batches that were successfully parsed. How do I do that?
> > >
> > > An important thing to keep in mind with the TransactionContext API is
> > that
> > > all records that a task returns from SourceTask::poll are implicitly
> > > included in a transaction. Invoking
> SourceTaskContext::transactionContext
> > > doesn't alter this or cause transactions to start being used;
> everything
> > is
> > > already in a transaction, and the Connect runtime automatically begins
> > > transactions for any records it sees from the task if it hasn't already
> > > begun one. It's also valid to return a null or empty list of records
> from
> > > SourceTask::poll. So in this case, you can invoke
> > > transactionContext.commitTransaction() (the no-args variant) and return
> > an
> > > empty batch from SourceTask::poll, which will cause the transaction
> > > containing the 4 valid records that were returned in the last 2 batches
> > to
> > > be committed.
> > >
> > > FWIW, I would be a little cautious about this approach. Many times it's
> > > better to fail fast on invalid data; it might be worth it to at least
> > allow
> > > users to configure whether the connector fails on invalid data, or
> > silently
> > > skips over it (which is what happens when transactions are aborted).
> > >
> > > > Why is abort not working without adding the last record to the list?
> > >
> > > Is it possible that there's a case where an abort is being requested
> > while
> > > the current transaction is empty (i.e., the task hasn't returned any
> > > records from SourceTask::poll since the last transaction was
> > > committed/aborted)? I think this may be a bug in the Connect framework
> > > where we don't check to see if a transaction is already open when a
> task
> > > requests that a transaction be aborted, which can cause tasks to fail
> > (see
> > > https://issues.apache.org/jira/browse/KAFKA-14799 for more details).
> > >
> > > Cheers,
> > >
> > > Chris
> > >
> > >
> > > On Wed, Mar 8, 2023 at 6:44 PM NITTY BENNY <nittybe...@gmail.com>
> wrote:
> > >
> > > > Hi Chris,
> > > >
> > > > I am not sure if you are able to see the images I shared with you .
> > > > Copying the code snippet below,
> > > >
> > > >  if (expectedRecordCount >= 0) {
> > > >             int missingCount = expectedRecordCount - (int) this.
> > > > recordOffset() - 1;
> > > >             if (missingCount > 0) {
> > > >               if (transactionContext != null) {
> > > >                 isMissedRecords = true;
> > > >               } else {
> > > >                 throw new DataException(String.format("Missing %d
> > records
> > > > (expecting %d, actual %d)", missingCount, expectedRecordCount, this.
> > > > recordOffset()));
> > > >               }
> > > >             } else if (missingCount < 0) {
> > > >               if (transactionContext != null) {
> > > >                 isMissedRecords = true;
> > > >               } else {
> > > >                 throw new DataException(String.format("Too many
> records
> > > > (expecting %d, actual %d)", expectedRecordCount,
> this.recordOffset()));
> > > >               }
> > > >             }
> > > >           }
> > > >           addLastRecord(records, null, value);
> > > >         }
> > > >
> > > >
> > > >
> > > >  //asn1 or binary abort
> > > >         if((config.parseErrorThreshold != null && parseErrorCount >=
> > > > config.parseErrorThreshold
> > > >         && lastbatch && transactionContext != null) ||
> (isMissedRecords
> > > > && transactionContext != null && lastbatch)) {
> > > >           log.info("Transaction is aborting");
> > > >             log.info("records = {}", records);
> > > >             if (!records.isEmpty()) {
> > > >               log.info("with record");
> > > >
> > >  transactionContext.abortTransaction(records.get(records.size
> > > > ()-1));
> > > >             } else {
> > > >               log.info("without record");
> > > >               transactionContext.abortTransaction();
> > > >             }
> > > >
> > > > Thanks,
> > > > Nitty
> > > >
> > > > On Wed, Mar 8, 2023 at 11:38 PM NITTY BENNY <nittybe...@gmail.com>
> > > wrote:
> > > >
> > > >> Hi Chris,
> > > >> Sorry for the typo in my previous email.
> > > >>
> > > >> Regarding the point 2,* the task returns a batch of records from
> > > >> SourceTask::poll (and, if using*
> > > >>
> > > >>
> > > >> *the per-record API provided by the TransactionContext class,
> includes
> > > >> atleast one record that should trigger a transaction commit/abort in
> > > >> thatbatch)*
> > > >> What if I am using the API without passing a record? We have 2 types
> > of
> > > >> use cases, one where on encountering an error record, we want to
> > commit
> > > >> previous successful batches and disregard the failed record and
> > upcoming
> > > >> batches. In this case we created the transactionContext just before
> > > reading
> > > >> the file (file is our transaction boundary).Say if I have a file
> with
> > 5
> > > >> records and batch size is 2, and in my 3rd batch I have one error
> > record
> > > >> then in that batch, I dont have a valid record to call commit or
> > abort.
> > > But
> > > >> I want to commit all the previous batches that were successfully
> > parsed.
> > > >> How do I do that?
> > > >>
> > > >> Second use case is where I want to abort a transaction if the record
> > > >> count doesn't match.
> > > >> Code Snippet :
> > > >> [image: image.png]
> > > >> There are no error records in this case. If you see I added the
> > > condition
> > > >> of transactionContext check to implement exactly once, without
> > > >> transaction it was just throwing the exception without calling the
> > > >> addLastRecord() method and in the catch block it just logs the
> message
> > > and
> > > >> return the list of records without the last record to poll().To make
> > it
> > > >> work, I called the method addLastRecord() in this case, so it is not
> > > >> throwing the exception and list has last record as well. Then I
> called
> > > the
> > > >> abort, everything got aborted. Why is abort not working without
> adding
> > > the
> > > >> last record to the list?
> > > >> [image: image.png]
> > > >>
> > > >> Code to call abort.
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> Thanks,
> > > >> Nitty
> > > >>
> > > >> On Wed, Mar 8, 2023 at 4:26 PM Chris Egerton
> <chr...@aiven.io.invalid
> > >
> > > >> wrote:
> > > >>
> > > >>> Hi Nitty,
> > > >>>
> > > >>> I'm a little confused about what you mean by this part:
> > > >>>
> > > >>> > transaction is not getting completed because it is not commiting
> > the
> > > >>> transaction offest.
> > > >>>
> > > >>> The only conditions required for a transaction to be completed
> when a
> > > >>> connector is defining its own transaction boundaries are:
> > > >>>
> > > >>> 1. The task requests a transaction commit/abort from the
> > > >>> TransactionContext
> > > >>> 2. The task returns a batch of records from SourceTask::poll (and,
> if
> > > >>> using
> > > >>> the per-record API provided by the TransactionContext class,
> includes
> > > at
> > > >>> least one record that should trigger a transaction commit/abort in
> > that
> > > >>> batch)
> > > >>>
> > > >>> The Connect runtime should automatically commit source offsets to
> > Kafka
> > > >>> whenever a transaction is completed, either by commit or abort.
> This
> > is
> > > >>> because transactions should only be aborted for data that should
> > never
> > > be
> > > >>> re-read by the connector; if there is a validation error that
> should
> > be
> > > >>> handled by reconfiguring the connector, then the task should throw
> an
> > > >>> exception instead of aborting the transaction.
> > > >>>
> > > >>> If possible, do you think you could provide a brief code snippet
> > > >>> illustrating what your task is doing that's causing issues?
> > > >>>
> > > >>> Cheers,
> > > >>>
> > > >>> Chris (not Chrise 🙂)
> > > >>>
> > > >>> On Tue, Mar 7, 2023 at 10:17 AM NITTY BENNY <nittybe...@gmail.com>
> > > >>> wrote:
> > > >>>
> > > >>> > Hi Chrise,
> > > >>> >
> > > >>> > Thanks for sharing the details.
> > > >>> >
> > > >>> > Regarding the use case, For Asn1 source connector we have a use
> > case
> > > to
> > > >>> > validate number of records in the file with the number of records
> > in
> > > >>> the
> > > >>> > header. So currently, if validation fails we are not sending the
> > last
> > > >>> > record to the topic. But after introducing exactly once with
> > > connector
> > > >>> > transaction boundary, I can see that if I call an abort when the
> > > >>> validation
> > > >>> > fails, transaction is not getting completed because it is not
> > > >>> commiting the
> > > >>> > transaction offest. I saw that transaction state changed to
> > > >>> CompleteAbort.
> > > >>> > So for my next transaction I am getting
> > InvalidProducerEpochException
> > > >>> and
> > > >>> > then task stopped after that. I tried calling the abort after
> > sending
> > > >>> last
> > > >>> > record to the topic then transaction getting completed.
> > > >>> >
> > > >>> > I dont know if I am doing anything wrong here.
> > > >>> >
> > > >>> > Please advise.
> > > >>> > Thanks,
> > > >>> > Nitty
> > > >>> >
> > > >>> > On Tue 7 Mar 2023 at 2:21 p.m., Chris Egerton
> > > <chr...@aiven.io.invalid
> > > >>> >
> > > >>> > wrote:
> > > >>> >
> > > >>> > > Hi Nitty,
> > > >>> > >
> > > >>> > > We've recently added some documentation on implementing
> > > exactly-once
> > > >>> > source
> > > >>> > > connectors here:
> > > >>> > >
> > > >>> >
> > > >>>
> > >
> >
> https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors
> > > >>> > > .
> > > >>> > > To quote a relevant passage from those docs:
> > > >>> > >
> > > >>> > > > In order for a source connector to take advantage of this
> > > support,
> > > >>> it
> > > >>> > > must be able to provide meaningful source offsets for each
> record
> > > >>> that it
> > > >>> > > emits, and resume consumption from the external system at the
> > exact
> > > >>> > > position corresponding to any of those offsets without dropping
> > or
> > > >>> > > duplicating messages.
> > > >>> > >
> > > >>> > > So, as long as your source connector is able to use the Kafka
> > > Connect
> > > >>> > > framework's offsets API correctly, it shouldn't be necessary to
> > > make
> > > >>> any
> > > >>> > > other code changes to the connector.
> > > >>> > >
> > > >>> > > To enable exactly-once support for source connectors on your
> > > Connect
> > > >>> > > cluster, see the docs section here:
> > > >>> > >
> > https://kafka.apache.org/documentation/#connect_exactlyoncesource
> > > >>> > >
> > > >>> > > With regard to transactions, a transactional producer is always
> > > >>> created
> > > >>> > > automatically for your connector by the Connect runtime when
> > > >>> exactly-once
> > > >>> > > support is enabled on the worker. The only reason to set
> > > >>> > > "transaction.boundary" to "connector" is if your connector
> would
> > > >>> like to
> > > >>> > > explicitly define its own transaction boundaries. In this case,
> > it
> > > >>> sounds
> > > >>> > > like may be what you want; I just want to make sure to call out
> > > that
> > > >>> in
> > > >>> > > either case, you should not be directly instantiating a
> producer
> > in
> > > >>> your
> > > >>> > > connector code, but let the Kafka Connect runtime do that for
> > you,
> > > >>> and
> > > >>> > just
> > > >>> > > worry about returning the right records from SourceTask::poll
> > (and
> > > >>> > possibly
> > > >>> > > defining custom transactions using the TransactionContext API).
> > > >>> > >
> > > >>> > > With respect to your question about committing or aborting in
> > > certain
> > > >>> > > circumstances, it'd be useful to know more about your use case,
> > > >>> since it
> > > >>> > > may not be necessary to define custom transaction boundaries in
> > > your
> > > >>> > > connector at all.
> > > >>> > >
> > > >>> > > Cheers,
> > > >>> > >
> > > >>> > > Chris
> > > >>> > >
> > > >>> > >
> > > >>> > >
> > > >>> > > On Tue, Mar 7, 2023 at 7:21 AM NITTY BENNY <
> nittybe...@gmail.com
> > >
> > > >>> wrote:
> > > >>> > >
> > > >>> > > > Hi Team,
> > > >>> > > >
> > > >>> > > > Adding on top of this, I tried creating a TransactionContext
> > > >>> object and
> > > >>> > > > calling the commitTransaction and abortTranaction methods in
> > > source
> > > >>> > > > connectors.
> > > >>> > > > But the main problem I saw is that if there is any error
> while
> > > >>> parsing
> > > >>> > > the
> > > >>> > > > record, connect is calling an abort but we have a use case to
> > > call
> > > >>> > commit
> > > >>> > > > in some cases. Is it a valid use case in terms of kafka
> > connect?
> > > >>> > > >
> > > >>> > > > Another Question - Should I use a transactional producer
> > instead
> > > >>> > > > creating an object of TransactionContext? Below is the
> > connector
> > > >>> > > > configuration I am using.
> > > >>> > > >
> > > >>> > > >   exactly.once.support: "required"
> > > >>> > > >   transaction.boundary: "connector"
> > > >>> > > >
> > > >>> > > > Could you please help me here?
> > > >>> > > >
> > > >>> > > > Thanks,
> > > >>> > > > Nitty
> > > >>> > > >
> > > >>> > > > On Tue, Mar 7, 2023 at 12:29 AM NITTY BENNY <
> > > nittybe...@gmail.com>
> > > >>> > > wrote:
> > > >>> > > >
> > > >>> > > > > Hi Team,
> > > >>> > > > > I am trying to implement exactly once behavior in our
> source
> > > >>> > connector.
> > > >>> > > > Is
> > > >>> > > > > there any sample source connector implementation available
> to
> > > >>> have a
> > > >>> > > look
> > > >>> > > > > at?
> > > >>> > > > > Regards,
> > > >>> > > > > Nitty
> > > >>> > > > >
> > > >>> > > >
> > > >>> > >
> > > >>> >
> > > >>>
> > > >>
> > >
> >
>

Reply via email to