Hi Chris, The below mentioned issue is happening for Json connector only. Is there any difference with asn1,binary,csv and json connector?
Thanks, Nitty On Mon, Mar 13, 2023 at 9:16 AM NITTY BENNY <nittybe...@gmail.com> wrote: > Hi Chris, > > Sorry Chris, I am not able to reproduce the above issue. > > I want to share with you one more use case I found. > The use case is in the first batch it returns 2 valid records and then in > the next batch it is an invalid record.Below is the transaction_state > topic, when I call a commit while processing an invalid record. > > connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0, > producerId=11, producerEpoch=2, txnTimeoutMs=60000, state=*Ongoing*, > pendingState=None, topicPartitions=HashSet(streams-input-2), > txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620463834) > > then after some time I saw the below states as well, > > connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0, > producerId=11, producerEpoch=3, txnTimeoutMs=60000, state=*PrepareAbort*, > pendingState=None, topicPartitions=HashSet(streams-input-2), > txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620526119) > connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0, > producerId=11, producerEpoch=3, txnTimeoutMs=60000, state=*CompleteAbort*, > pendingState=None, topicPartitions=HashSet(), > txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620526121) > > Later for the next transaction, when it returns the first batch below is > the logs I can see. > > Transiting to abortable error state due to > org.apache.kafka.common.errors.InvalidProducerEpochException: Producer > attempted to produce with an old epoch. > (org.apache.kafka.clients.producer.internals.TransactionManager) > [kafka-producer-network-thread | > connector-producer-json-sftp-source-connector-0] > 2023-03-12 11:32:45,220 ERROR [json-sftp-source-connector|task-0] > ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} failed to send > record to streams-input: > (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask) > [kafka-producer-network-thread | > connector-producer-json-sftp-source-connector-0] > org.apache.kafka.common.errors.InvalidProducerEpochException: Producer > attempted to produce with an old epoch. > 2023-03-12 11:32:45,222 INFO [json-sftp-source-connector|task-0] [Producer > clientId=connector-producer-json-sftp-source-connector-0, > transactionalId=connect-cluster-json-sftp-source-connector-0] Transiting to > fatal error state due to > org.apache.kafka.common.errors.ProducerFencedException: There is a newer > producer with the same transactionalId which fences the current one. > (org.apache.kafka.clients.producer.internals.TransactionManager) > [kafka-producer-network-thread | > connector-producer-json-sftp-source-connector-0] > 2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0] > [Producer clientId=connector-producer-json-sftp-source-connector-0, > transactionalId=connect-cluster-json-sftp-source-connector-0] Aborting > producer batches due to fatal error > (org.apache.kafka.clients.producer.internals.Sender) > [kafka-producer-network-thread | > connector-producer-json-sftp-source-connector-0] > org.apache.kafka.common.errors.ProducerFencedException: There is a newer > producer with the same transactionalId which fences the current one. > 2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0] > ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Failed to > flush offsets to storage: > (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask) > [kafka-producer-network-thread | > connector-producer-json-sftp-source-connector-0] > org.apache.kafka.common.errors.ProducerFencedException: There is a newer > producer with the same transactionalId which fences the current one. > 2023-03-12 11:32:45,224 ERROR [json-sftp-source-connector|task-0] > ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} failed to send > record to streams-input: > (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask) > [kafka-producer-network-thread | > connector-producer-json-sftp-source-connector-0] > org.apache.kafka.common.errors.ProducerFencedException: There is a newer > producer with the same transactionalId which fences the current one. > 2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0|offsets] > ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Failed to > commit producer transaction > (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask) > [task-thread-json-sftp-source-connector-0] > org.apache.kafka.common.errors.ProducerFencedException: There is a newer > producer with the same transactionalId which fences the current one. > 2023-03-12 11:32:45,225 ERROR [json-sftp-source-connector|task-0] > ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Task threw an > uncaught and unrecoverable exception. Task is being killed and will not > recover until manually restarted > (org.apache.kafka.connect.runtime.WorkerTask) > [task-thread-json-sftp-source-connector-0] > > Do you know why it is showing an abort state even if I call commit? > > I tested one more scenario, When I call the commit I saw the below > connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0, > producerId=11, producerEpoch=2, txnTimeoutMs=60000, state=*Ongoing*, > pendingState=None, topicPartitions=HashSet(streams-input-2), > txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620463834) > Then, before changing the states to Abort, I dropped the next file then I > dont see any issues. Previous transaction > as well as the current transaction are committed. > > Thank you for your support. > > Thanks, > Nitty > > On Fri, Mar 10, 2023 at 8:04 PM Chris Egerton <chr...@aiven.io.invalid> > wrote: > >> Hi Nitty, >> >> > I called commitTransaction when I reach the first error record, but >> commit is not happening for me. Kafka connect tries to abort the >> transaction automatically >> >> This is really interesting--are you certain that your task never invoked >> TransactionContext::abortTransaction in this case? I'm looking over the >> code base and it seems fairly clear that the only thing that could trigger >> a call to KafkaProducer::abortTransaction is a request by the task to >> abort >> a transaction (either for a next batch, or for a specific record). It may >> help to run the connector in a debugger and/or look for "Aborting >> transaction for batch as requested by connector" or "Aborting transaction >> for record on topic <TOPIC NAME HERE> as requested by connector" log >> messages (which will be emitted at INFO level by >> the org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask class if >> the task is requesting an abort). >> >> Regardless, I'll work on a fix for the bug with aborting empty >> transactions. Thanks for helping uncover that one! >> >> Cheers, >> >> Chris >> >> On Thu, Mar 9, 2023 at 6:36 PM NITTY BENNY <nittybe...@gmail.com> wrote: >> >> > Hi Chris, >> > >> > We have a use case to commit previous successful records and stop the >> > processing of the current file and move on with the next file. To >> achieve >> > that I called commitTransaction when I reach the first error record, but >> > commit is not happening for me. Kafka connect tries to abort the >> > transaction automatically, I checked the _transaction_state topic and >> > states marked as PrepareAbort and CompleteAbort. Do you know why kafka >> > connect automatically invokes abort instead of the implicit commit I >> > called? >> > Then as a result, when I tries to parse the next file - say ABC, I saw >> the >> > logs "Aborting incomplete transaction" and ERROR: "Failed to sent >> record to >> > topic", and we lost the first batch of records from the current >> transaction >> > in the file ABC. >> > >> > Is it possible that there's a case where an abort is being requested >> while >> > the current transaction is empty (i.e., the task hasn't returned any >> > records from SourceTask::poll since the last transaction was >> > committed/aborted)? --- Yes, that case is possible for us. There is a >> case >> > where the first record itself an error record. >> > >> > Thanks, >> > Nitty >> > >> > On Thu, Mar 9, 2023 at 3:48 PM Chris Egerton <chr...@aiven.io.invalid> >> > wrote: >> > >> > > Hi Nitty, >> > > >> > > Thanks for the code examples and the detailed explanations, this is >> > really >> > > helpful! >> > > >> > > > Say if I have a file with 5 records and batch size is 2, and in my >> 3rd >> > > batch I have one error record then in that batch, I dont have a valid >> > > record to call commit or abort. But I want to commit all the previous >> > > batches that were successfully parsed. How do I do that? >> > > >> > > An important thing to keep in mind with the TransactionContext API is >> > that >> > > all records that a task returns from SourceTask::poll are implicitly >> > > included in a transaction. Invoking >> SourceTaskContext::transactionContext >> > > doesn't alter this or cause transactions to start being used; >> everything >> > is >> > > already in a transaction, and the Connect runtime automatically begins >> > > transactions for any records it sees from the task if it hasn't >> already >> > > begun one. It's also valid to return a null or empty list of records >> from >> > > SourceTask::poll. So in this case, you can invoke >> > > transactionContext.commitTransaction() (the no-args variant) and >> return >> > an >> > > empty batch from SourceTask::poll, which will cause the transaction >> > > containing the 4 valid records that were returned in the last 2 >> batches >> > to >> > > be committed. >> > > >> > > FWIW, I would be a little cautious about this approach. Many times >> it's >> > > better to fail fast on invalid data; it might be worth it to at least >> > allow >> > > users to configure whether the connector fails on invalid data, or >> > silently >> > > skips over it (which is what happens when transactions are aborted). >> > > >> > > > Why is abort not working without adding the last record to the list? >> > > >> > > Is it possible that there's a case where an abort is being requested >> > while >> > > the current transaction is empty (i.e., the task hasn't returned any >> > > records from SourceTask::poll since the last transaction was >> > > committed/aborted)? I think this may be a bug in the Connect framework >> > > where we don't check to see if a transaction is already open when a >> task >> > > requests that a transaction be aborted, which can cause tasks to fail >> > (see >> > > https://issues.apache.org/jira/browse/KAFKA-14799 for more details). >> > > >> > > Cheers, >> > > >> > > Chris >> > > >> > > >> > > On Wed, Mar 8, 2023 at 6:44 PM NITTY BENNY <nittybe...@gmail.com> >> wrote: >> > > >> > > > Hi Chris, >> > > > >> > > > I am not sure if you are able to see the images I shared with you . >> > > > Copying the code snippet below, >> > > > >> > > > if (expectedRecordCount >= 0) { >> > > > int missingCount = expectedRecordCount - (int) this. >> > > > recordOffset() - 1; >> > > > if (missingCount > 0) { >> > > > if (transactionContext != null) { >> > > > isMissedRecords = true; >> > > > } else { >> > > > throw new DataException(String.format("Missing %d >> > records >> > > > (expecting %d, actual %d)", missingCount, expectedRecordCount, this. >> > > > recordOffset())); >> > > > } >> > > > } else if (missingCount < 0) { >> > > > if (transactionContext != null) { >> > > > isMissedRecords = true; >> > > > } else { >> > > > throw new DataException(String.format("Too many >> records >> > > > (expecting %d, actual %d)", expectedRecordCount, >> this.recordOffset())); >> > > > } >> > > > } >> > > > } >> > > > addLastRecord(records, null, value); >> > > > } >> > > > >> > > > >> > > > >> > > > //asn1 or binary abort >> > > > if((config.parseErrorThreshold != null && parseErrorCount >= >> > > > config.parseErrorThreshold >> > > > && lastbatch && transactionContext != null) || >> (isMissedRecords >> > > > && transactionContext != null && lastbatch)) { >> > > > log.info("Transaction is aborting"); >> > > > log.info("records = {}", records); >> > > > if (!records.isEmpty()) { >> > > > log.info("with record"); >> > > > >> > > transactionContext.abortTransaction(records.get(records.size >> > > > ()-1)); >> > > > } else { >> > > > log.info("without record"); >> > > > transactionContext.abortTransaction(); >> > > > } >> > > > >> > > > Thanks, >> > > > Nitty >> > > > >> > > > On Wed, Mar 8, 2023 at 11:38 PM NITTY BENNY <nittybe...@gmail.com> >> > > wrote: >> > > > >> > > >> Hi Chris, >> > > >> Sorry for the typo in my previous email. >> > > >> >> > > >> Regarding the point 2,* the task returns a batch of records from >> > > >> SourceTask::poll (and, if using* >> > > >> >> > > >> >> > > >> *the per-record API provided by the TransactionContext class, >> includes >> > > >> atleast one record that should trigger a transaction commit/abort >> in >> > > >> thatbatch)* >> > > >> What if I am using the API without passing a record? We have 2 >> types >> > of >> > > >> use cases, one where on encountering an error record, we want to >> > commit >> > > >> previous successful batches and disregard the failed record and >> > upcoming >> > > >> batches. In this case we created the transactionContext just before >> > > reading >> > > >> the file (file is our transaction boundary).Say if I have a file >> with >> > 5 >> > > >> records and batch size is 2, and in my 3rd batch I have one error >> > record >> > > >> then in that batch, I dont have a valid record to call commit or >> > abort. >> > > But >> > > >> I want to commit all the previous batches that were successfully >> > parsed. >> > > >> How do I do that? >> > > >> >> > > >> Second use case is where I want to abort a transaction if the >> record >> > > >> count doesn't match. >> > > >> Code Snippet : >> > > >> [image: image.png] >> > > >> There are no error records in this case. If you see I added the >> > > condition >> > > >> of transactionContext check to implement exactly once, without >> > > >> transaction it was just throwing the exception without calling the >> > > >> addLastRecord() method and in the catch block it just logs the >> message >> > > and >> > > >> return the list of records without the last record to poll().To >> make >> > it >> > > >> work, I called the method addLastRecord() in this case, so it is >> not >> > > >> throwing the exception and list has last record as well. Then I >> called >> > > the >> > > >> abort, everything got aborted. Why is abort not working without >> adding >> > > the >> > > >> last record to the list? >> > > >> [image: image.png] >> > > >> >> > > >> Code to call abort. >> > > >> >> > > >> >> > > >> >> > > >> >> > > >> Thanks, >> > > >> Nitty >> > > >> >> > > >> On Wed, Mar 8, 2023 at 4:26 PM Chris Egerton >> <chr...@aiven.io.invalid >> > > >> > > >> wrote: >> > > >> >> > > >>> Hi Nitty, >> > > >>> >> > > >>> I'm a little confused about what you mean by this part: >> > > >>> >> > > >>> > transaction is not getting completed because it is not commiting >> > the >> > > >>> transaction offest. >> > > >>> >> > > >>> The only conditions required for a transaction to be completed >> when a >> > > >>> connector is defining its own transaction boundaries are: >> > > >>> >> > > >>> 1. The task requests a transaction commit/abort from the >> > > >>> TransactionContext >> > > >>> 2. The task returns a batch of records from SourceTask::poll >> (and, if >> > > >>> using >> > > >>> the per-record API provided by the TransactionContext class, >> includes >> > > at >> > > >>> least one record that should trigger a transaction commit/abort in >> > that >> > > >>> batch) >> > > >>> >> > > >>> The Connect runtime should automatically commit source offsets to >> > Kafka >> > > >>> whenever a transaction is completed, either by commit or abort. >> This >> > is >> > > >>> because transactions should only be aborted for data that should >> > never >> > > be >> > > >>> re-read by the connector; if there is a validation error that >> should >> > be >> > > >>> handled by reconfiguring the connector, then the task should >> throw an >> > > >>> exception instead of aborting the transaction. >> > > >>> >> > > >>> If possible, do you think you could provide a brief code snippet >> > > >>> illustrating what your task is doing that's causing issues? >> > > >>> >> > > >>> Cheers, >> > > >>> >> > > >>> Chris (not Chrise 🙂) >> > > >>> >> > > >>> On Tue, Mar 7, 2023 at 10:17 AM NITTY BENNY <nittybe...@gmail.com >> > >> > > >>> wrote: >> > > >>> >> > > >>> > Hi Chrise, >> > > >>> > >> > > >>> > Thanks for sharing the details. >> > > >>> > >> > > >>> > Regarding the use case, For Asn1 source connector we have a use >> > case >> > > to >> > > >>> > validate number of records in the file with the number of >> records >> > in >> > > >>> the >> > > >>> > header. So currently, if validation fails we are not sending the >> > last >> > > >>> > record to the topic. But after introducing exactly once with >> > > connector >> > > >>> > transaction boundary, I can see that if I call an abort when the >> > > >>> validation >> > > >>> > fails, transaction is not getting completed because it is not >> > > >>> commiting the >> > > >>> > transaction offest. I saw that transaction state changed to >> > > >>> CompleteAbort. >> > > >>> > So for my next transaction I am getting >> > InvalidProducerEpochException >> > > >>> and >> > > >>> > then task stopped after that. I tried calling the abort after >> > sending >> > > >>> last >> > > >>> > record to the topic then transaction getting completed. >> > > >>> > >> > > >>> > I dont know if I am doing anything wrong here. >> > > >>> > >> > > >>> > Please advise. >> > > >>> > Thanks, >> > > >>> > Nitty >> > > >>> > >> > > >>> > On Tue 7 Mar 2023 at 2:21 p.m., Chris Egerton >> > > <chr...@aiven.io.invalid >> > > >>> > >> > > >>> > wrote: >> > > >>> > >> > > >>> > > Hi Nitty, >> > > >>> > > >> > > >>> > > We've recently added some documentation on implementing >> > > exactly-once >> > > >>> > source >> > > >>> > > connectors here: >> > > >>> > > >> > > >>> > >> > > >>> >> > > >> > >> https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors >> > > >>> > > . >> > > >>> > > To quote a relevant passage from those docs: >> > > >>> > > >> > > >>> > > > In order for a source connector to take advantage of this >> > > support, >> > > >>> it >> > > >>> > > must be able to provide meaningful source offsets for each >> record >> > > >>> that it >> > > >>> > > emits, and resume consumption from the external system at the >> > exact >> > > >>> > > position corresponding to any of those offsets without >> dropping >> > or >> > > >>> > > duplicating messages. >> > > >>> > > >> > > >>> > > So, as long as your source connector is able to use the Kafka >> > > Connect >> > > >>> > > framework's offsets API correctly, it shouldn't be necessary >> to >> > > make >> > > >>> any >> > > >>> > > other code changes to the connector. >> > > >>> > > >> > > >>> > > To enable exactly-once support for source connectors on your >> > > Connect >> > > >>> > > cluster, see the docs section here: >> > > >>> > > >> > https://kafka.apache.org/documentation/#connect_exactlyoncesource >> > > >>> > > >> > > >>> > > With regard to transactions, a transactional producer is >> always >> > > >>> created >> > > >>> > > automatically for your connector by the Connect runtime when >> > > >>> exactly-once >> > > >>> > > support is enabled on the worker. The only reason to set >> > > >>> > > "transaction.boundary" to "connector" is if your connector >> would >> > > >>> like to >> > > >>> > > explicitly define its own transaction boundaries. In this >> case, >> > it >> > > >>> sounds >> > > >>> > > like may be what you want; I just want to make sure to call >> out >> > > that >> > > >>> in >> > > >>> > > either case, you should not be directly instantiating a >> producer >> > in >> > > >>> your >> > > >>> > > connector code, but let the Kafka Connect runtime do that for >> > you, >> > > >>> and >> > > >>> > just >> > > >>> > > worry about returning the right records from SourceTask::poll >> > (and >> > > >>> > possibly >> > > >>> > > defining custom transactions using the TransactionContext >> API). >> > > >>> > > >> > > >>> > > With respect to your question about committing or aborting in >> > > certain >> > > >>> > > circumstances, it'd be useful to know more about your use >> case, >> > > >>> since it >> > > >>> > > may not be necessary to define custom transaction boundaries >> in >> > > your >> > > >>> > > connector at all. >> > > >>> > > >> > > >>> > > Cheers, >> > > >>> > > >> > > >>> > > Chris >> > > >>> > > >> > > >>> > > >> > > >>> > > >> > > >>> > > On Tue, Mar 7, 2023 at 7:21 AM NITTY BENNY < >> nittybe...@gmail.com >> > > >> > > >>> wrote: >> > > >>> > > >> > > >>> > > > Hi Team, >> > > >>> > > > >> > > >>> > > > Adding on top of this, I tried creating a TransactionContext >> > > >>> object and >> > > >>> > > > calling the commitTransaction and abortTranaction methods in >> > > source >> > > >>> > > > connectors. >> > > >>> > > > But the main problem I saw is that if there is any error >> while >> > > >>> parsing >> > > >>> > > the >> > > >>> > > > record, connect is calling an abort but we have a use case >> to >> > > call >> > > >>> > commit >> > > >>> > > > in some cases. Is it a valid use case in terms of kafka >> > connect? >> > > >>> > > > >> > > >>> > > > Another Question - Should I use a transactional producer >> > instead >> > > >>> > > > creating an object of TransactionContext? Below is the >> > connector >> > > >>> > > > configuration I am using. >> > > >>> > > > >> > > >>> > > > exactly.once.support: "required" >> > > >>> > > > transaction.boundary: "connector" >> > > >>> > > > >> > > >>> > > > Could you please help me here? >> > > >>> > > > >> > > >>> > > > Thanks, >> > > >>> > > > Nitty >> > > >>> > > > >> > > >>> > > > On Tue, Mar 7, 2023 at 12:29 AM NITTY BENNY < >> > > nittybe...@gmail.com> >> > > >>> > > wrote: >> > > >>> > > > >> > > >>> > > > > Hi Team, >> > > >>> > > > > I am trying to implement exactly once behavior in our >> source >> > > >>> > connector. >> > > >>> > > > Is >> > > >>> > > > > there any sample source connector implementation >> available to >> > > >>> have a >> > > >>> > > look >> > > >>> > > > > at? >> > > >>> > > > > Regards, >> > > >>> > > > > Nitty >> > > >>> > > > > >> > > >>> > > > >> > > >>> > > >> > > >>> > >> > > >>> >> > > >> >> > > >> > >> >