Hi Chris,

The below mentioned issue is happening for Json connector only. Is there
any difference with asn1,binary,csv and json connector?

Thanks,
Nitty

On Mon, Mar 13, 2023 at 9:16 AM NITTY BENNY <nittybe...@gmail.com> wrote:

> Hi Chris,
>
> Sorry Chris, I am not able to reproduce the above issue.
>
> I want to share with you one more use case I found.
> The use case is in the first batch it returns 2 valid records and then in
> the next batch it is an invalid record.Below is the transaction_state
> topic, when I call a commit while processing an invalid record.
>
> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
> producerId=11, producerEpoch=2, txnTimeoutMs=60000, state=*Ongoing*,
> pendingState=None, topicPartitions=HashSet(streams-input-2),
> txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620463834)
>
> then after some time I saw the below states as well,
>
> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
> producerId=11, producerEpoch=3, txnTimeoutMs=60000, state=*PrepareAbort*,
> pendingState=None, topicPartitions=HashSet(streams-input-2),
> txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620526119)
> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
> producerId=11, producerEpoch=3, txnTimeoutMs=60000, state=*CompleteAbort*,
> pendingState=None, topicPartitions=HashSet(),
> txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620526121)
>
> Later for the next transaction, when it returns the first batch below is
> the logs I can see.
>
>  Transiting to abortable error state due to
> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer
> attempted to produce with an old epoch.
> (org.apache.kafka.clients.producer.internals.TransactionManager)
> [kafka-producer-network-thread |
> connector-producer-json-sftp-source-connector-0]
> 2023-03-12 11:32:45,220 ERROR [json-sftp-source-connector|task-0]
> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} failed to send
> record to streams-input:
> (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask)
> [kafka-producer-network-thread |
> connector-producer-json-sftp-source-connector-0]
> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer
> attempted to produce with an old epoch.
> 2023-03-12 11:32:45,222 INFO [json-sftp-source-connector|task-0] [Producer
> clientId=connector-producer-json-sftp-source-connector-0,
> transactionalId=connect-cluster-json-sftp-source-connector-0] Transiting to
> fatal error state due to
> org.apache.kafka.common.errors.ProducerFencedException: There is a newer
> producer with the same transactionalId which fences the current one.
> (org.apache.kafka.clients.producer.internals.TransactionManager)
> [kafka-producer-network-thread |
> connector-producer-json-sftp-source-connector-0]
> 2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0]
> [Producer clientId=connector-producer-json-sftp-source-connector-0,
> transactionalId=connect-cluster-json-sftp-source-connector-0] Aborting
> producer batches due to fatal error
> (org.apache.kafka.clients.producer.internals.Sender)
> [kafka-producer-network-thread |
> connector-producer-json-sftp-source-connector-0]
> org.apache.kafka.common.errors.ProducerFencedException: There is a newer
> producer with the same transactionalId which fences the current one.
> 2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0]
> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Failed to
> flush offsets to storage:
> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask)
> [kafka-producer-network-thread |
> connector-producer-json-sftp-source-connector-0]
> org.apache.kafka.common.errors.ProducerFencedException: There is a newer
> producer with the same transactionalId which fences the current one.
> 2023-03-12 11:32:45,224 ERROR [json-sftp-source-connector|task-0]
> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} failed to send
> record to streams-input:
> (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask)
> [kafka-producer-network-thread |
> connector-producer-json-sftp-source-connector-0]
> org.apache.kafka.common.errors.ProducerFencedException: There is a newer
> producer with the same transactionalId which fences the current one.
> 2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0|offsets]
> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Failed to
> commit producer transaction
> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask)
> [task-thread-json-sftp-source-connector-0]
> org.apache.kafka.common.errors.ProducerFencedException: There is a newer
> producer with the same transactionalId which fences the current one.
> 2023-03-12 11:32:45,225 ERROR [json-sftp-source-connector|task-0]
> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Task threw an
> uncaught and unrecoverable exception. Task is being killed and will not
> recover until manually restarted
> (org.apache.kafka.connect.runtime.WorkerTask)
> [task-thread-json-sftp-source-connector-0]
>
> Do you know why it is showing an abort state even if I call commit?
>
> I tested one more scenario, When I call the commit I saw the below
> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
> producerId=11, producerEpoch=2, txnTimeoutMs=60000, state=*Ongoing*,
> pendingState=None, topicPartitions=HashSet(streams-input-2),
> txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620463834)
> Then, before changing the states to Abort, I dropped the next file then I
> dont see any issues. Previous transaction
> as well as the current transaction are committed.
>
> Thank you for your support.
>
> Thanks,
> Nitty
>
> On Fri, Mar 10, 2023 at 8:04 PM Chris Egerton <chr...@aiven.io.invalid>
> wrote:
>
>> Hi Nitty,
>>
>> > I called commitTransaction when I reach the first error record, but
>> commit is not happening for me. Kafka connect tries to abort the
>> transaction automatically
>>
>> This is really interesting--are you certain that your task never invoked
>> TransactionContext::abortTransaction in this case? I'm looking over the
>> code base and it seems fairly clear that the only thing that could trigger
>> a call to KafkaProducer::abortTransaction is a request by the task to
>> abort
>> a transaction (either for a next batch, or for a specific record). It may
>> help to run the connector in a debugger and/or look for "Aborting
>> transaction for batch as requested by connector" or "Aborting transaction
>> for record on topic <TOPIC NAME HERE> as requested by connector" log
>> messages (which will be emitted at INFO level by
>> the org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask class if
>> the task is requesting an abort).
>>
>> Regardless, I'll work on a fix for the bug with aborting empty
>> transactions. Thanks for helping uncover that one!
>>
>> Cheers,
>>
>> Chris
>>
>> On Thu, Mar 9, 2023 at 6:36 PM NITTY BENNY <nittybe...@gmail.com> wrote:
>>
>> > Hi Chris,
>> >
>> > We have a use case to commit previous successful records and stop the
>> > processing of the current file and move on with the next file. To
>> achieve
>> > that I called commitTransaction when I reach the first error record, but
>> > commit is not happening for me. Kafka connect tries to abort the
>> > transaction automatically, I checked the _transaction_state topic and
>> > states marked as PrepareAbort and CompleteAbort. Do you know why kafka
>> > connect automatically invokes abort instead of the implicit commit I
>> > called?
>> > Then as a result, when I tries to parse the next file - say ABC, I saw
>> the
>> > logs "Aborting incomplete transaction" and ERROR: "Failed to sent
>> record to
>> > topic", and we lost the first batch of records from the current
>> transaction
>> > in the file ABC.
>> >
>> > Is it possible that there's a case where an abort is being requested
>> while
>> > the current transaction is empty (i.e., the task hasn't returned any
>> > records from SourceTask::poll since the last transaction was
>> > committed/aborted)? --- Yes, that case is possible for us. There is a
>> case
>> > where the first record itself an error record.
>> >
>> > Thanks,
>> > Nitty
>> >
>> > On Thu, Mar 9, 2023 at 3:48 PM Chris Egerton <chr...@aiven.io.invalid>
>> > wrote:
>> >
>> > > Hi Nitty,
>> > >
>> > > Thanks for the code examples and the detailed explanations, this is
>> > really
>> > > helpful!
>> > >
>> > > > Say if I have a file with 5 records and batch size is 2, and in my
>> 3rd
>> > > batch I have one error record then in that batch, I dont have a valid
>> > > record to call commit or abort. But I want to commit all the previous
>> > > batches that were successfully parsed. How do I do that?
>> > >
>> > > An important thing to keep in mind with the TransactionContext API is
>> > that
>> > > all records that a task returns from SourceTask::poll are implicitly
>> > > included in a transaction. Invoking
>> SourceTaskContext::transactionContext
>> > > doesn't alter this or cause transactions to start being used;
>> everything
>> > is
>> > > already in a transaction, and the Connect runtime automatically begins
>> > > transactions for any records it sees from the task if it hasn't
>> already
>> > > begun one. It's also valid to return a null or empty list of records
>> from
>> > > SourceTask::poll. So in this case, you can invoke
>> > > transactionContext.commitTransaction() (the no-args variant) and
>> return
>> > an
>> > > empty batch from SourceTask::poll, which will cause the transaction
>> > > containing the 4 valid records that were returned in the last 2
>> batches
>> > to
>> > > be committed.
>> > >
>> > > FWIW, I would be a little cautious about this approach. Many times
>> it's
>> > > better to fail fast on invalid data; it might be worth it to at least
>> > allow
>> > > users to configure whether the connector fails on invalid data, or
>> > silently
>> > > skips over it (which is what happens when transactions are aborted).
>> > >
>> > > > Why is abort not working without adding the last record to the list?
>> > >
>> > > Is it possible that there's a case where an abort is being requested
>> > while
>> > > the current transaction is empty (i.e., the task hasn't returned any
>> > > records from SourceTask::poll since the last transaction was
>> > > committed/aborted)? I think this may be a bug in the Connect framework
>> > > where we don't check to see if a transaction is already open when a
>> task
>> > > requests that a transaction be aborted, which can cause tasks to fail
>> > (see
>> > > https://issues.apache.org/jira/browse/KAFKA-14799 for more details).
>> > >
>> > > Cheers,
>> > >
>> > > Chris
>> > >
>> > >
>> > > On Wed, Mar 8, 2023 at 6:44 PM NITTY BENNY <nittybe...@gmail.com>
>> wrote:
>> > >
>> > > > Hi Chris,
>> > > >
>> > > > I am not sure if you are able to see the images I shared with you .
>> > > > Copying the code snippet below,
>> > > >
>> > > >  if (expectedRecordCount >= 0) {
>> > > >             int missingCount = expectedRecordCount - (int) this.
>> > > > recordOffset() - 1;
>> > > >             if (missingCount > 0) {
>> > > >               if (transactionContext != null) {
>> > > >                 isMissedRecords = true;
>> > > >               } else {
>> > > >                 throw new DataException(String.format("Missing %d
>> > records
>> > > > (expecting %d, actual %d)", missingCount, expectedRecordCount, this.
>> > > > recordOffset()));
>> > > >               }
>> > > >             } else if (missingCount < 0) {
>> > > >               if (transactionContext != null) {
>> > > >                 isMissedRecords = true;
>> > > >               } else {
>> > > >                 throw new DataException(String.format("Too many
>> records
>> > > > (expecting %d, actual %d)", expectedRecordCount,
>> this.recordOffset()));
>> > > >               }
>> > > >             }
>> > > >           }
>> > > >           addLastRecord(records, null, value);
>> > > >         }
>> > > >
>> > > >
>> > > >
>> > > >  //asn1 or binary abort
>> > > >         if((config.parseErrorThreshold != null && parseErrorCount >=
>> > > > config.parseErrorThreshold
>> > > >         && lastbatch && transactionContext != null) ||
>> (isMissedRecords
>> > > > && transactionContext != null && lastbatch)) {
>> > > >           log.info("Transaction is aborting");
>> > > >             log.info("records = {}", records);
>> > > >             if (!records.isEmpty()) {
>> > > >               log.info("with record");
>> > > >
>> > >  transactionContext.abortTransaction(records.get(records.size
>> > > > ()-1));
>> > > >             } else {
>> > > >               log.info("without record");
>> > > >               transactionContext.abortTransaction();
>> > > >             }
>> > > >
>> > > > Thanks,
>> > > > Nitty
>> > > >
>> > > > On Wed, Mar 8, 2023 at 11:38 PM NITTY BENNY <nittybe...@gmail.com>
>> > > wrote:
>> > > >
>> > > >> Hi Chris,
>> > > >> Sorry for the typo in my previous email.
>> > > >>
>> > > >> Regarding the point 2,* the task returns a batch of records from
>> > > >> SourceTask::poll (and, if using*
>> > > >>
>> > > >>
>> > > >> *the per-record API provided by the TransactionContext class,
>> includes
>> > > >> atleast one record that should trigger a transaction commit/abort
>> in
>> > > >> thatbatch)*
>> > > >> What if I am using the API without passing a record? We have 2
>> types
>> > of
>> > > >> use cases, one where on encountering an error record, we want to
>> > commit
>> > > >> previous successful batches and disregard the failed record and
>> > upcoming
>> > > >> batches. In this case we created the transactionContext just before
>> > > reading
>> > > >> the file (file is our transaction boundary).Say if I have a file
>> with
>> > 5
>> > > >> records and batch size is 2, and in my 3rd batch I have one error
>> > record
>> > > >> then in that batch, I dont have a valid record to call commit or
>> > abort.
>> > > But
>> > > >> I want to commit all the previous batches that were successfully
>> > parsed.
>> > > >> How do I do that?
>> > > >>
>> > > >> Second use case is where I want to abort a transaction if the
>> record
>> > > >> count doesn't match.
>> > > >> Code Snippet :
>> > > >> [image: image.png]
>> > > >> There are no error records in this case. If you see I added the
>> > > condition
>> > > >> of transactionContext check to implement exactly once, without
>> > > >> transaction it was just throwing the exception without calling the
>> > > >> addLastRecord() method and in the catch block it just logs the
>> message
>> > > and
>> > > >> return the list of records without the last record to poll().To
>> make
>> > it
>> > > >> work, I called the method addLastRecord() in this case, so it is
>> not
>> > > >> throwing the exception and list has last record as well. Then I
>> called
>> > > the
>> > > >> abort, everything got aborted. Why is abort not working without
>> adding
>> > > the
>> > > >> last record to the list?
>> > > >> [image: image.png]
>> > > >>
>> > > >> Code to call abort.
>> > > >>
>> > > >>
>> > > >>
>> > > >>
>> > > >> Thanks,
>> > > >> Nitty
>> > > >>
>> > > >> On Wed, Mar 8, 2023 at 4:26 PM Chris Egerton
>> <chr...@aiven.io.invalid
>> > >
>> > > >> wrote:
>> > > >>
>> > > >>> Hi Nitty,
>> > > >>>
>> > > >>> I'm a little confused about what you mean by this part:
>> > > >>>
>> > > >>> > transaction is not getting completed because it is not commiting
>> > the
>> > > >>> transaction offest.
>> > > >>>
>> > > >>> The only conditions required for a transaction to be completed
>> when a
>> > > >>> connector is defining its own transaction boundaries are:
>> > > >>>
>> > > >>> 1. The task requests a transaction commit/abort from the
>> > > >>> TransactionContext
>> > > >>> 2. The task returns a batch of records from SourceTask::poll
>> (and, if
>> > > >>> using
>> > > >>> the per-record API provided by the TransactionContext class,
>> includes
>> > > at
>> > > >>> least one record that should trigger a transaction commit/abort in
>> > that
>> > > >>> batch)
>> > > >>>
>> > > >>> The Connect runtime should automatically commit source offsets to
>> > Kafka
>> > > >>> whenever a transaction is completed, either by commit or abort.
>> This
>> > is
>> > > >>> because transactions should only be aborted for data that should
>> > never
>> > > be
>> > > >>> re-read by the connector; if there is a validation error that
>> should
>> > be
>> > > >>> handled by reconfiguring the connector, then the task should
>> throw an
>> > > >>> exception instead of aborting the transaction.
>> > > >>>
>> > > >>> If possible, do you think you could provide a brief code snippet
>> > > >>> illustrating what your task is doing that's causing issues?
>> > > >>>
>> > > >>> Cheers,
>> > > >>>
>> > > >>> Chris (not Chrise 🙂)
>> > > >>>
>> > > >>> On Tue, Mar 7, 2023 at 10:17 AM NITTY BENNY <nittybe...@gmail.com
>> >
>> > > >>> wrote:
>> > > >>>
>> > > >>> > Hi Chrise,
>> > > >>> >
>> > > >>> > Thanks for sharing the details.
>> > > >>> >
>> > > >>> > Regarding the use case, For Asn1 source connector we have a use
>> > case
>> > > to
>> > > >>> > validate number of records in the file with the number of
>> records
>> > in
>> > > >>> the
>> > > >>> > header. So currently, if validation fails we are not sending the
>> > last
>> > > >>> > record to the topic. But after introducing exactly once with
>> > > connector
>> > > >>> > transaction boundary, I can see that if I call an abort when the
>> > > >>> validation
>> > > >>> > fails, transaction is not getting completed because it is not
>> > > >>> commiting the
>> > > >>> > transaction offest. I saw that transaction state changed to
>> > > >>> CompleteAbort.
>> > > >>> > So for my next transaction I am getting
>> > InvalidProducerEpochException
>> > > >>> and
>> > > >>> > then task stopped after that. I tried calling the abort after
>> > sending
>> > > >>> last
>> > > >>> > record to the topic then transaction getting completed.
>> > > >>> >
>> > > >>> > I dont know if I am doing anything wrong here.
>> > > >>> >
>> > > >>> > Please advise.
>> > > >>> > Thanks,
>> > > >>> > Nitty
>> > > >>> >
>> > > >>> > On Tue 7 Mar 2023 at 2:21 p.m., Chris Egerton
>> > > <chr...@aiven.io.invalid
>> > > >>> >
>> > > >>> > wrote:
>> > > >>> >
>> > > >>> > > Hi Nitty,
>> > > >>> > >
>> > > >>> > > We've recently added some documentation on implementing
>> > > exactly-once
>> > > >>> > source
>> > > >>> > > connectors here:
>> > > >>> > >
>> > > >>> >
>> > > >>>
>> > >
>> >
>> https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors
>> > > >>> > > .
>> > > >>> > > To quote a relevant passage from those docs:
>> > > >>> > >
>> > > >>> > > > In order for a source connector to take advantage of this
>> > > support,
>> > > >>> it
>> > > >>> > > must be able to provide meaningful source offsets for each
>> record
>> > > >>> that it
>> > > >>> > > emits, and resume consumption from the external system at the
>> > exact
>> > > >>> > > position corresponding to any of those offsets without
>> dropping
>> > or
>> > > >>> > > duplicating messages.
>> > > >>> > >
>> > > >>> > > So, as long as your source connector is able to use the Kafka
>> > > Connect
>> > > >>> > > framework's offsets API correctly, it shouldn't be necessary
>> to
>> > > make
>> > > >>> any
>> > > >>> > > other code changes to the connector.
>> > > >>> > >
>> > > >>> > > To enable exactly-once support for source connectors on your
>> > > Connect
>> > > >>> > > cluster, see the docs section here:
>> > > >>> > >
>> > https://kafka.apache.org/documentation/#connect_exactlyoncesource
>> > > >>> > >
>> > > >>> > > With regard to transactions, a transactional producer is
>> always
>> > > >>> created
>> > > >>> > > automatically for your connector by the Connect runtime when
>> > > >>> exactly-once
>> > > >>> > > support is enabled on the worker. The only reason to set
>> > > >>> > > "transaction.boundary" to "connector" is if your connector
>> would
>> > > >>> like to
>> > > >>> > > explicitly define its own transaction boundaries. In this
>> case,
>> > it
>> > > >>> sounds
>> > > >>> > > like may be what you want; I just want to make sure to call
>> out
>> > > that
>> > > >>> in
>> > > >>> > > either case, you should not be directly instantiating a
>> producer
>> > in
>> > > >>> your
>> > > >>> > > connector code, but let the Kafka Connect runtime do that for
>> > you,
>> > > >>> and
>> > > >>> > just
>> > > >>> > > worry about returning the right records from SourceTask::poll
>> > (and
>> > > >>> > possibly
>> > > >>> > > defining custom transactions using the TransactionContext
>> API).
>> > > >>> > >
>> > > >>> > > With respect to your question about committing or aborting in
>> > > certain
>> > > >>> > > circumstances, it'd be useful to know more about your use
>> case,
>> > > >>> since it
>> > > >>> > > may not be necessary to define custom transaction boundaries
>> in
>> > > your
>> > > >>> > > connector at all.
>> > > >>> > >
>> > > >>> > > Cheers,
>> > > >>> > >
>> > > >>> > > Chris
>> > > >>> > >
>> > > >>> > >
>> > > >>> > >
>> > > >>> > > On Tue, Mar 7, 2023 at 7:21 AM NITTY BENNY <
>> nittybe...@gmail.com
>> > >
>> > > >>> wrote:
>> > > >>> > >
>> > > >>> > > > Hi Team,
>> > > >>> > > >
>> > > >>> > > > Adding on top of this, I tried creating a TransactionContext
>> > > >>> object and
>> > > >>> > > > calling the commitTransaction and abortTranaction methods in
>> > > source
>> > > >>> > > > connectors.
>> > > >>> > > > But the main problem I saw is that if there is any error
>> while
>> > > >>> parsing
>> > > >>> > > the
>> > > >>> > > > record, connect is calling an abort but we have a use case
>> to
>> > > call
>> > > >>> > commit
>> > > >>> > > > in some cases. Is it a valid use case in terms of kafka
>> > connect?
>> > > >>> > > >
>> > > >>> > > > Another Question - Should I use a transactional producer
>> > instead
>> > > >>> > > > creating an object of TransactionContext? Below is the
>> > connector
>> > > >>> > > > configuration I am using.
>> > > >>> > > >
>> > > >>> > > >   exactly.once.support: "required"
>> > > >>> > > >   transaction.boundary: "connector"
>> > > >>> > > >
>> > > >>> > > > Could you please help me here?
>> > > >>> > > >
>> > > >>> > > > Thanks,
>> > > >>> > > > Nitty
>> > > >>> > > >
>> > > >>> > > > On Tue, Mar 7, 2023 at 12:29 AM NITTY BENNY <
>> > > nittybe...@gmail.com>
>> > > >>> > > wrote:
>> > > >>> > > >
>> > > >>> > > > > Hi Team,
>> > > >>> > > > > I am trying to implement exactly once behavior in our
>> source
>> > > >>> > connector.
>> > > >>> > > > Is
>> > > >>> > > > > there any sample source connector implementation
>> available to
>> > > >>> have a
>> > > >>> > > look
>> > > >>> > > > > at?
>> > > >>> > > > > Regards,
>> > > >>> > > > > Nitty
>> > > >>> > > > >
>> > > >>> > > >
>> > > >>> > >
>> > > >>> >
>> > > >>>
>> > > >>
>> > >
>> >
>>
>

Reply via email to