Hi Nitty, Sorry, I should have clarified. The reason I'm thinking about shutdown here is that, when exactly-once support is enabled on a Kafka Connect cluster and a new set of task configurations is generated for a connector, the Connect framework makes an effort to shut down all the old task instances for that connector, and then fences out the transactional producers for all of those instances. I was thinking that this may lead to the producer exceptions you are seeing but, after double-checking this assumption, that does not appear to be the case.
Would it be possible to share the source code for your connector and a reproduction scenario for what you're seeing? That may be easier than coordinating a call. Cheers, Chris On Tue, Mar 14, 2023 at 6:15 AM NITTY BENNY <nittybe...@gmail.com> wrote: > Hi Chris, > > Is there any possibility to have a call with you? This is actually blocking > our delivery, I actually want to sort with this. > > Thanks, > Nitty > > On Mon, Mar 13, 2023 at 8:18 PM NITTY BENNY <nittybe...@gmail.com> wrote: > > > Hi Chris, > > > > I really don't understand why a graceful shutdown will happen during a > > commit operation? Am I understanding something wrong here?. I see > > this happens when I have a batch of 2 valid records and in the second > > batch the record is invalid. In that case I want to commit the valid > > records. So I called commit and sent an empty list for the current batch > to > > poll() and then when the next file comes in and poll sees new records, I > > see InvalidProducerEpochException. > > Please advise me. > > > > Thanks, > > Nitty > > > > On Mon, Mar 13, 2023 at 5:33 PM NITTY BENNY <nittybe...@gmail.com> > wrote: > > > >> Hi Chris, > >> > >> The difference is in the Task Classes, no difference for value/key > >> convertors. > >> > >> I don’t see log messages for graceful shutdown. I am not clear on what > >> you mean by shutting down the task. > >> > >> I called the commit operation for the successful records. Should I > >> perform any other steps if I have an invalid record? > >> Please advise. > >> > >> Thanks, > >> Nitty > >> > >> On Mon, Mar 13, 2023 at 3:42 PM Chris Egerton <chr...@aiven.io.invalid> > >> wrote: > >> > >>> Hi Nitty, > >>> > >>> Thanks again for all the details here, especially the log messages. > >>> > >>> > The below mentioned issue is happening for Json connector only. Is > >>> there > >>> any difference with asn1,binary,csv and json connector? > >>> > >>> Can you clarify if the difference here is in the Connector/Task > classens, > >>> or if it's in the key/value converters that are configured for the > >>> connector? The key/value converters are configured using the > >>> "key.converter" and "value.converter" property and, if problems arise > >>> with > >>> them, the task will fail and, if it has a non-empty ongoing > transaction, > >>> that transaction will be automatically aborted since we close the > task's > >>> Kafka producer when it fails (or shuts down gracefully). > >>> > >>> With regards to these log messages: > >>> > >>> > org.apache.kafka.common.errors.ProducerFencedException: There is a > >>> newer > >>> producer with the same transactionalId which fences the current one. > >>> > >>> It looks like your tasks aren't shutting down gracefully in time, which > >>> causes them to be fenced out by the Connect framework later on. Do you > >>> see > >>> messages like "Graceful stop of task <TASK ID HERE> failed" in the logs > >>> for > >>> your Connect worker? > >>> > >>> Cheers, > >>> > >>> Chris > >>> > >>> On Mon, Mar 13, 2023 at 10:58 AM NITTY BENNY <nittybe...@gmail.com> > >>> wrote: > >>> > >>> > Hi Chris, > >>> > > >>> > As you said, the below message is coming when I call an abort if > there > >>> is > >>> > an invalid record, then for the next transaction I can see the below > >>> > message and then the connector will be stopped. > >>> > 2023-03-13 14:28:26,043 INFO [json-sftp-source-connector|task-0] > >>> Aborting > >>> > transaction for batch as requested by connector > >>> > (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask) > >>> > [task-thread-json-sftp-source-connector-0] > >>> > 2023-03-13 14:28:26,043 INFO [json-sftp-source-connector|task-0] > >>> [Producer > >>> > clientId=connector-producer-json-sftp-source-connector-0, > >>> > transactionalId=connect-cluster-json-sftp-source-connector-0] > Aborting > >>> > incomplete transaction > >>> (org.apache.kafka.clients.producer.KafkaProducer) > >>> > [task-thread-json-sftp-source-connector-0] > >>> > > >>> > The issue with InvalidProducerEpoch is happening when I call the > >>> commit if > >>> > there is an invalid record, and in the next transaction I am getting > >>> > InvalidProducerEpoch Exception and the messages are copied in the > >>> previous > >>> > email. I don't know if this will also be fixed by your bug fix.I am > >>> using > >>> > kafka 3.3.1 version as of now. > >>> > > >>> > Thanks, > >>> > Nitty > >>> > > >>> > > >>> > On Mon, Mar 13, 2023 at 10:47 AM NITTY BENNY <nittybe...@gmail.com> > >>> wrote: > >>> > > >>> > > Hi Chris, > >>> > > > >>> > > The below mentioned issue is happening for Json connector only. Is > >>> there > >>> > > any difference with asn1,binary,csv and json connector? > >>> > > > >>> > > Thanks, > >>> > > Nitty > >>> > > > >>> > > On Mon, Mar 13, 2023 at 9:16 AM NITTY BENNY <nittybe...@gmail.com> > >>> > wrote: > >>> > > > >>> > >> Hi Chris, > >>> > >> > >>> > >> Sorry Chris, I am not able to reproduce the above issue. > >>> > >> > >>> > >> I want to share with you one more use case I found. > >>> > >> The use case is in the first batch it returns 2 valid records and > >>> then > >>> > in > >>> > >> the next batch it is an invalid record.Below is the > >>> transaction_state > >>> > >> topic, when I call a commit while processing an invalid record. > >>> > >> > >>> > >> > >>> > > >>> > connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0, > >>> > >> producerId=11, producerEpoch=2, txnTimeoutMs=60000, > state=*Ongoing*, > >>> > >> pendingState=None, topicPartitions=HashSet(streams-input-2), > >>> > >> txnStartTimestamp=1678620463834, > >>> txnLastUpdateTimestamp=1678620463834) > >>> > >> > >>> > >> then after some time I saw the below states as well, > >>> > >> > >>> > >> > >>> > > >>> > connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0, > >>> > >> producerId=11, producerEpoch=3, txnTimeoutMs=60000, > >>> > state=*PrepareAbort*, > >>> > >> pendingState=None, topicPartitions=HashSet(streams-input-2), > >>> > >> txnStartTimestamp=1678620463834, > >>> txnLastUpdateTimestamp=1678620526119) > >>> > >> > >>> > > >>> > connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0, > >>> > >> producerId=11, producerEpoch=3, txnTimeoutMs=60000, > >>> > state=*CompleteAbort*, > >>> > >> pendingState=None, topicPartitions=HashSet(), > >>> > >> txnStartTimestamp=1678620463834, > >>> txnLastUpdateTimestamp=1678620526121) > >>> > >> > >>> > >> Later for the next transaction, when it returns the first batch > >>> below is > >>> > >> the logs I can see. > >>> > >> > >>> > >> Transiting to abortable error state due to > >>> > >> org.apache.kafka.common.errors.InvalidProducerEpochException: > >>> Producer > >>> > >> attempted to produce with an old epoch. > >>> > >> (org.apache.kafka.clients.producer.internals.TransactionManager) > >>> > >> [kafka-producer-network-thread | > >>> > >> connector-producer-json-sftp-source-connector-0] > >>> > >> 2023-03-12 11:32:45,220 ERROR [json-sftp-source-connector|task-0] > >>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} > failed > >>> to > >>> > send > >>> > >> record to streams-input: > >>> > >> (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask) > >>> > >> [kafka-producer-network-thread | > >>> > >> connector-producer-json-sftp-source-connector-0] > >>> > >> org.apache.kafka.common.errors.InvalidProducerEpochException: > >>> Producer > >>> > >> attempted to produce with an old epoch. > >>> > >> 2023-03-12 11:32:45,222 INFO [json-sftp-source-connector|task-0] > >>> > >> [Producer > clientId=connector-producer-json-sftp-source-connector-0, > >>> > >> transactionalId=connect-cluster-json-sftp-source-connector-0] > >>> > Transiting to > >>> > >> fatal error state due to > >>> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a > >>> newer > >>> > >> producer with the same transactionalId which fences the current > one. > >>> > >> (org.apache.kafka.clients.producer.internals.TransactionManager) > >>> > >> [kafka-producer-network-thread | > >>> > >> connector-producer-json-sftp-source-connector-0] > >>> > >> 2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0] > >>> > >> [Producer > clientId=connector-producer-json-sftp-source-connector-0, > >>> > >> transactionalId=connect-cluster-json-sftp-source-connector-0] > >>> Aborting > >>> > >> producer batches due to fatal error > >>> > >> (org.apache.kafka.clients.producer.internals.Sender) > >>> > >> [kafka-producer-network-thread | > >>> > >> connector-producer-json-sftp-source-connector-0] > >>> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a > >>> newer > >>> > >> producer with the same transactionalId which fences the current > one. > >>> > >> 2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0] > >>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} > Failed > >>> to > >>> > >> flush offsets to storage: > >>> > >> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask) > >>> > >> [kafka-producer-network-thread | > >>> > >> connector-producer-json-sftp-source-connector-0] > >>> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a > >>> newer > >>> > >> producer with the same transactionalId which fences the current > one. > >>> > >> 2023-03-12 11:32:45,224 ERROR [json-sftp-source-connector|task-0] > >>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} > failed > >>> to > >>> > send > >>> > >> record to streams-input: > >>> > >> (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask) > >>> > >> [kafka-producer-network-thread | > >>> > >> connector-producer-json-sftp-source-connector-0] > >>> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a > >>> newer > >>> > >> producer with the same transactionalId which fences the current > one. > >>> > >> 2023-03-12 11:32:45,222 ERROR > >>> > [json-sftp-source-connector|task-0|offsets] > >>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} > Failed > >>> to > >>> > >> commit producer transaction > >>> > >> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask) > >>> > >> [task-thread-json-sftp-source-connector-0] > >>> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a > >>> newer > >>> > >> producer with the same transactionalId which fences the current > one. > >>> > >> 2023-03-12 11:32:45,225 ERROR [json-sftp-source-connector|task-0] > >>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Task > >>> threw > >>> > an > >>> > >> uncaught and unrecoverable exception. Task is being killed and > will > >>> not > >>> > >> recover until manually restarted > >>> > >> (org.apache.kafka.connect.runtime.WorkerTask) > >>> > >> [task-thread-json-sftp-source-connector-0] > >>> > >> > >>> > >> Do you know why it is showing an abort state even if I call > commit? > >>> > >> > >>> > >> I tested one more scenario, When I call the commit I saw the below > >>> > >> > >>> > > >>> > connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0, > >>> > >> producerId=11, producerEpoch=2, txnTimeoutMs=60000, > state=*Ongoing*, > >>> > >> pendingState=None, topicPartitions=HashSet(streams-input-2), > >>> > >> txnStartTimestamp=1678620463834, > >>> txnLastUpdateTimestamp=1678620463834) > >>> > >> Then, before changing the states to Abort, I dropped the next file > >>> then > >>> > I > >>> > >> dont see any issues. Previous transaction > >>> > >> as well as the current transaction are committed. > >>> > >> > >>> > >> Thank you for your support. > >>> > >> > >>> > >> Thanks, > >>> > >> Nitty > >>> > >> > >>> > >> On Fri, Mar 10, 2023 at 8:04 PM Chris Egerton > >>> <chr...@aiven.io.invalid> > >>> > >> wrote: > >>> > >> > >>> > >>> Hi Nitty, > >>> > >>> > >>> > >>> > I called commitTransaction when I reach the first error record, > >>> but > >>> > >>> commit is not happening for me. Kafka connect tries to abort the > >>> > >>> transaction automatically > >>> > >>> > >>> > >>> This is really interesting--are you certain that your task never > >>> > invoked > >>> > >>> TransactionContext::abortTransaction in this case? I'm looking > >>> over the > >>> > >>> code base and it seems fairly clear that the only thing that > could > >>> > >>> trigger > >>> > >>> a call to KafkaProducer::abortTransaction is a request by the > task > >>> to > >>> > >>> abort > >>> > >>> a transaction (either for a next batch, or for a specific > record). > >>> It > >>> > may > >>> > >>> help to run the connector in a debugger and/or look for "Aborting > >>> > >>> transaction for batch as requested by connector" or "Aborting > >>> > transaction > >>> > >>> for record on topic <TOPIC NAME HERE> as requested by connector" > >>> log > >>> > >>> messages (which will be emitted at INFO level by > >>> > >>> the org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask > >>> class > >>> > if > >>> > >>> the task is requesting an abort). > >>> > >>> > >>> > >>> Regardless, I'll work on a fix for the bug with aborting empty > >>> > >>> transactions. Thanks for helping uncover that one! > >>> > >>> > >>> > >>> Cheers, > >>> > >>> > >>> > >>> Chris > >>> > >>> > >>> > >>> On Thu, Mar 9, 2023 at 6:36 PM NITTY BENNY <nittybe...@gmail.com > > > >>> > wrote: > >>> > >>> > >>> > >>> > Hi Chris, > >>> > >>> > > >>> > >>> > We have a use case to commit previous successful records and > >>> stop the > >>> > >>> > processing of the current file and move on with the next file. > To > >>> > >>> achieve > >>> > >>> > that I called commitTransaction when I reach the first error > >>> record, > >>> > >>> but > >>> > >>> > commit is not happening for me. Kafka connect tries to abort > the > >>> > >>> > transaction automatically, I checked the _transaction_state > >>> topic and > >>> > >>> > states marked as PrepareAbort and CompleteAbort. Do you know > why > >>> > kafka > >>> > >>> > connect automatically invokes abort instead of the implicit > >>> commit I > >>> > >>> > called? > >>> > >>> > Then as a result, when I tries to parse the next file - say > ABC, > >>> I > >>> > saw > >>> > >>> the > >>> > >>> > logs "Aborting incomplete transaction" and ERROR: "Failed to > sent > >>> > >>> record to > >>> > >>> > topic", and we lost the first batch of records from the current > >>> > >>> transaction > >>> > >>> > in the file ABC. > >>> > >>> > > >>> > >>> > Is it possible that there's a case where an abort is being > >>> requested > >>> > >>> while > >>> > >>> > the current transaction is empty (i.e., the task hasn't > returned > >>> any > >>> > >>> > records from SourceTask::poll since the last transaction was > >>> > >>> > committed/aborted)? --- Yes, that case is possible for us. > There > >>> is a > >>> > >>> case > >>> > >>> > where the first record itself an error record. > >>> > >>> > > >>> > >>> > Thanks, > >>> > >>> > Nitty > >>> > >>> > > >>> > >>> > On Thu, Mar 9, 2023 at 3:48 PM Chris Egerton > >>> <chr...@aiven.io.invalid > >>> > > > >>> > >>> > wrote: > >>> > >>> > > >>> > >>> > > Hi Nitty, > >>> > >>> > > > >>> > >>> > > Thanks for the code examples and the detailed explanations, > >>> this is > >>> > >>> > really > >>> > >>> > > helpful! > >>> > >>> > > > >>> > >>> > > > Say if I have a file with 5 records and batch size is 2, > and > >>> in > >>> > my > >>> > >>> 3rd > >>> > >>> > > batch I have one error record then in that batch, I dont > have a > >>> > valid > >>> > >>> > > record to call commit or abort. But I want to commit all the > >>> > previous > >>> > >>> > > batches that were successfully parsed. How do I do that? > >>> > >>> > > > >>> > >>> > > An important thing to keep in mind with the > TransactionContext > >>> API > >>> > is > >>> > >>> > that > >>> > >>> > > all records that a task returns from SourceTask::poll are > >>> > implicitly > >>> > >>> > > included in a transaction. Invoking > >>> > >>> SourceTaskContext::transactionContext > >>> > >>> > > doesn't alter this or cause transactions to start being used; > >>> > >>> everything > >>> > >>> > is > >>> > >>> > > already in a transaction, and the Connect runtime > automatically > >>> > >>> begins > >>> > >>> > > transactions for any records it sees from the task if it > hasn't > >>> > >>> already > >>> > >>> > > begun one. It's also valid to return a null or empty list of > >>> > records > >>> > >>> from > >>> > >>> > > SourceTask::poll. So in this case, you can invoke > >>> > >>> > > transactionContext.commitTransaction() (the no-args variant) > >>> and > >>> > >>> return > >>> > >>> > an > >>> > >>> > > empty batch from SourceTask::poll, which will cause the > >>> transaction > >>> > >>> > > containing the 4 valid records that were returned in the > last 2 > >>> > >>> batches > >>> > >>> > to > >>> > >>> > > be committed. > >>> > >>> > > > >>> > >>> > > FWIW, I would be a little cautious about this approach. Many > >>> times > >>> > >>> it's > >>> > >>> > > better to fail fast on invalid data; it might be worth it to > at > >>> > least > >>> > >>> > allow > >>> > >>> > > users to configure whether the connector fails on invalid > >>> data, or > >>> > >>> > silently > >>> > >>> > > skips over it (which is what happens when transactions are > >>> > aborted). > >>> > >>> > > > >>> > >>> > > > Why is abort not working without adding the last record to > >>> the > >>> > >>> list? > >>> > >>> > > > >>> > >>> > > Is it possible that there's a case where an abort is being > >>> > requested > >>> > >>> > while > >>> > >>> > > the current transaction is empty (i.e., the task hasn't > >>> returned > >>> > any > >>> > >>> > > records from SourceTask::poll since the last transaction was > >>> > >>> > > committed/aborted)? I think this may be a bug in the Connect > >>> > >>> framework > >>> > >>> > > where we don't check to see if a transaction is already open > >>> when a > >>> > >>> task > >>> > >>> > > requests that a transaction be aborted, which can cause tasks > >>> to > >>> > fail > >>> > >>> > (see > >>> > >>> > > https://issues.apache.org/jira/browse/KAFKA-14799 for more > >>> > details). > >>> > >>> > > > >>> > >>> > > Cheers, > >>> > >>> > > > >>> > >>> > > Chris > >>> > >>> > > > >>> > >>> > > > >>> > >>> > > On Wed, Mar 8, 2023 at 6:44 PM NITTY BENNY < > >>> nittybe...@gmail.com> > >>> > >>> wrote: > >>> > >>> > > > >>> > >>> > > > Hi Chris, > >>> > >>> > > > > >>> > >>> > > > I am not sure if you are able to see the images I shared > with > >>> > you . > >>> > >>> > > > Copying the code snippet below, > >>> > >>> > > > > >>> > >>> > > > if (expectedRecordCount >= 0) { > >>> > >>> > > > int missingCount = expectedRecordCount - (int) > >>> this. > >>> > >>> > > > recordOffset() - 1; > >>> > >>> > > > if (missingCount > 0) { > >>> > >>> > > > if (transactionContext != null) { > >>> > >>> > > > isMissedRecords = true; > >>> > >>> > > > } else { > >>> > >>> > > > throw new > >>> DataException(String.format("Missing %d > >>> > >>> > records > >>> > >>> > > > (expecting %d, actual %d)", missingCount, > >>> expectedRecordCount, > >>> > >>> this. > >>> > >>> > > > recordOffset())); > >>> > >>> > > > } > >>> > >>> > > > } else if (missingCount < 0) { > >>> > >>> > > > if (transactionContext != null) { > >>> > >>> > > > isMissedRecords = true; > >>> > >>> > > > } else { > >>> > >>> > > > throw new DataException(String.format("Too > >>> many > >>> > >>> records > >>> > >>> > > > (expecting %d, actual %d)", expectedRecordCount, > >>> > >>> this.recordOffset())); > >>> > >>> > > > } > >>> > >>> > > > } > >>> > >>> > > > } > >>> > >>> > > > addLastRecord(records, null, value); > >>> > >>> > > > } > >>> > >>> > > > > >>> > >>> > > > > >>> > >>> > > > > >>> > >>> > > > //asn1 or binary abort > >>> > >>> > > > if((config.parseErrorThreshold != null && > >>> parseErrorCount > >>> > >>> >= > >>> > >>> > > > config.parseErrorThreshold > >>> > >>> > > > && lastbatch && transactionContext != null) || > >>> > >>> (isMissedRecords > >>> > >>> > > > && transactionContext != null && lastbatch)) { > >>> > >>> > > > log.info("Transaction is aborting"); > >>> > >>> > > > log.info("records = {}", records); > >>> > >>> > > > if (!records.isEmpty()) { > >>> > >>> > > > log.info("with record"); > >>> > >>> > > > > >>> > >>> > > transactionContext.abortTransaction(records.get(records.size > >>> > >>> > > > ()-1)); > >>> > >>> > > > } else { > >>> > >>> > > > log.info("without record"); > >>> > >>> > > > transactionContext.abortTransaction(); > >>> > >>> > > > } > >>> > >>> > > > > >>> > >>> > > > Thanks, > >>> > >>> > > > Nitty > >>> > >>> > > > > >>> > >>> > > > On Wed, Mar 8, 2023 at 11:38 PM NITTY BENNY < > >>> > nittybe...@gmail.com> > >>> > >>> > > wrote: > >>> > >>> > > > > >>> > >>> > > >> Hi Chris, > >>> > >>> > > >> Sorry for the typo in my previous email. > >>> > >>> > > >> > >>> > >>> > > >> Regarding the point 2,* the task returns a batch of > records > >>> from > >>> > >>> > > >> SourceTask::poll (and, if using* > >>> > >>> > > >> > >>> > >>> > > >> > >>> > >>> > > >> *the per-record API provided by the TransactionContext > >>> class, > >>> > >>> includes > >>> > >>> > > >> atleast one record that should trigger a transaction > >>> > commit/abort > >>> > >>> in > >>> > >>> > > >> thatbatch)* > >>> > >>> > > >> What if I am using the API without passing a record? We > >>> have 2 > >>> > >>> types > >>> > >>> > of > >>> > >>> > > >> use cases, one where on encountering an error record, we > >>> want to > >>> > >>> > commit > >>> > >>> > > >> previous successful batches and disregard the failed > record > >>> and > >>> > >>> > upcoming > >>> > >>> > > >> batches. In this case we created the transactionContext > just > >>> > >>> before > >>> > >>> > > reading > >>> > >>> > > >> the file (file is our transaction boundary).Say if I have > a > >>> file > >>> > >>> with > >>> > >>> > 5 > >>> > >>> > > >> records and batch size is 2, and in my 3rd batch I have > one > >>> > error > >>> > >>> > record > >>> > >>> > > >> then in that batch, I dont have a valid record to call > >>> commit or > >>> > >>> > abort. > >>> > >>> > > But > >>> > >>> > > >> I want to commit all the previous batches that were > >>> successfully > >>> > >>> > parsed. > >>> > >>> > > >> How do I do that? > >>> > >>> > > >> > >>> > >>> > > >> Second use case is where I want to abort a transaction if > >>> the > >>> > >>> record > >>> > >>> > > >> count doesn't match. > >>> > >>> > > >> Code Snippet : > >>> > >>> > > >> [image: image.png] > >>> > >>> > > >> There are no error records in this case. If you see I > added > >>> the > >>> > >>> > > condition > >>> > >>> > > >> of transactionContext check to implement exactly once, > >>> without > >>> > >>> > > >> transaction it was just throwing the exception without > >>> calling > >>> > the > >>> > >>> > > >> addLastRecord() method and in the catch block it just logs > >>> the > >>> > >>> message > >>> > >>> > > and > >>> > >>> > > >> return the list of records without the last record to > >>> poll().To > >>> > >>> make > >>> > >>> > it > >>> > >>> > > >> work, I called the method addLastRecord() in this case, so > >>> it is > >>> > >>> not > >>> > >>> > > >> throwing the exception and list has last record as well. > >>> Then I > >>> > >>> called > >>> > >>> > > the > >>> > >>> > > >> abort, everything got aborted. Why is abort not working > >>> without > >>> > >>> adding > >>> > >>> > > the > >>> > >>> > > >> last record to the list? > >>> > >>> > > >> [image: image.png] > >>> > >>> > > >> > >>> > >>> > > >> Code to call abort. > >>> > >>> > > >> > >>> > >>> > > >> > >>> > >>> > > >> > >>> > >>> > > >> > >>> > >>> > > >> Thanks, > >>> > >>> > > >> Nitty > >>> > >>> > > >> > >>> > >>> > > >> On Wed, Mar 8, 2023 at 4:26 PM Chris Egerton > >>> > >>> <chr...@aiven.io.invalid > >>> > >>> > > > >>> > >>> > > >> wrote: > >>> > >>> > > >> > >>> > >>> > > >>> Hi Nitty, > >>> > >>> > > >>> > >>> > >>> > > >>> I'm a little confused about what you mean by this part: > >>> > >>> > > >>> > >>> > >>> > > >>> > transaction is not getting completed because it is not > >>> > >>> commiting > >>> > >>> > the > >>> > >>> > > >>> transaction offest. > >>> > >>> > > >>> > >>> > >>> > > >>> The only conditions required for a transaction to be > >>> completed > >>> > >>> when a > >>> > >>> > > >>> connector is defining its own transaction boundaries are: > >>> > >>> > > >>> > >>> > >>> > > >>> 1. The task requests a transaction commit/abort from the > >>> > >>> > > >>> TransactionContext > >>> > >>> > > >>> 2. The task returns a batch of records from > >>> SourceTask::poll > >>> > >>> (and, if > >>> > >>> > > >>> using > >>> > >>> > > >>> the per-record API provided by the TransactionContext > >>> class, > >>> > >>> includes > >>> > >>> > > at > >>> > >>> > > >>> least one record that should trigger a transaction > >>> commit/abort > >>> > >>> in > >>> > >>> > that > >>> > >>> > > >>> batch) > >>> > >>> > > >>> > >>> > >>> > > >>> The Connect runtime should automatically commit source > >>> offsets > >>> > to > >>> > >>> > Kafka > >>> > >>> > > >>> whenever a transaction is completed, either by commit or > >>> abort. > >>> > >>> This > >>> > >>> > is > >>> > >>> > > >>> because transactions should only be aborted for data that > >>> > should > >>> > >>> > never > >>> > >>> > > be > >>> > >>> > > >>> re-read by the connector; if there is a validation error > >>> that > >>> > >>> should > >>> > >>> > be > >>> > >>> > > >>> handled by reconfiguring the connector, then the task > >>> should > >>> > >>> throw an > >>> > >>> > > >>> exception instead of aborting the transaction. > >>> > >>> > > >>> > >>> > >>> > > >>> If possible, do you think you could provide a brief code > >>> > snippet > >>> > >>> > > >>> illustrating what your task is doing that's causing > issues? > >>> > >>> > > >>> > >>> > >>> > > >>> Cheers, > >>> > >>> > > >>> > >>> > >>> > > >>> Chris (not Chrise 🙂) > >>> > >>> > > >>> > >>> > >>> > > >>> On Tue, Mar 7, 2023 at 10:17 AM NITTY BENNY < > >>> > >>> nittybe...@gmail.com> > >>> > >>> > > >>> wrote: > >>> > >>> > > >>> > >>> > >>> > > >>> > Hi Chrise, > >>> > >>> > > >>> > > >>> > >>> > > >>> > Thanks for sharing the details. > >>> > >>> > > >>> > > >>> > >>> > > >>> > Regarding the use case, For Asn1 source connector we > >>> have a > >>> > use > >>> > >>> > case > >>> > >>> > > to > >>> > >>> > > >>> > validate number of records in the file with the number > of > >>> > >>> records > >>> > >>> > in > >>> > >>> > > >>> the > >>> > >>> > > >>> > header. So currently, if validation fails we are not > >>> sending > >>> > >>> the > >>> > >>> > last > >>> > >>> > > >>> > record to the topic. But after introducing exactly once > >>> with > >>> > >>> > > connector > >>> > >>> > > >>> > transaction boundary, I can see that if I call an abort > >>> when > >>> > >>> the > >>> > >>> > > >>> validation > >>> > >>> > > >>> > fails, transaction is not getting completed because it > >>> is not > >>> > >>> > > >>> commiting the > >>> > >>> > > >>> > transaction offest. I saw that transaction state > changed > >>> to > >>> > >>> > > >>> CompleteAbort. > >>> > >>> > > >>> > So for my next transaction I am getting > >>> > >>> > InvalidProducerEpochException > >>> > >>> > > >>> and > >>> > >>> > > >>> > then task stopped after that. I tried calling the abort > >>> after > >>> > >>> > sending > >>> > >>> > > >>> last > >>> > >>> > > >>> > record to the topic then transaction getting completed. > >>> > >>> > > >>> > > >>> > >>> > > >>> > I dont know if I am doing anything wrong here. > >>> > >>> > > >>> > > >>> > >>> > > >>> > Please advise. > >>> > >>> > > >>> > Thanks, > >>> > >>> > > >>> > Nitty > >>> > >>> > > >>> > > >>> > >>> > > >>> > On Tue 7 Mar 2023 at 2:21 p.m., Chris Egerton > >>> > >>> > > <chr...@aiven.io.invalid > >>> > >>> > > >>> > > >>> > >>> > > >>> > wrote: > >>> > >>> > > >>> > > >>> > >>> > > >>> > > Hi Nitty, > >>> > >>> > > >>> > > > >>> > >>> > > >>> > > We've recently added some documentation on > implementing > >>> > >>> > > exactly-once > >>> > >>> > > >>> > source > >>> > >>> > > >>> > > connectors here: > >>> > >>> > > >>> > > > >>> > >>> > > >>> > > >>> > >>> > > >>> > >>> > >>> > > > >>> > >>> > > >>> > >>> > >>> > > >>> > https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors > >>> > >>> > > >>> > > . > >>> > >>> > > >>> > > To quote a relevant passage from those docs: > >>> > >>> > > >>> > > > >>> > >>> > > >>> > > > In order for a source connector to take advantage > of > >>> this > >>> > >>> > > support, > >>> > >>> > > >>> it > >>> > >>> > > >>> > > must be able to provide meaningful source offsets for > >>> each > >>> > >>> record > >>> > >>> > > >>> that it > >>> > >>> > > >>> > > emits, and resume consumption from the external > system > >>> at > >>> > the > >>> > >>> > exact > >>> > >>> > > >>> > > position corresponding to any of those offsets > without > >>> > >>> dropping > >>> > >>> > or > >>> > >>> > > >>> > > duplicating messages. > >>> > >>> > > >>> > > > >>> > >>> > > >>> > > So, as long as your source connector is able to use > the > >>> > Kafka > >>> > >>> > > Connect > >>> > >>> > > >>> > > framework's offsets API correctly, it shouldn't be > >>> > necessary > >>> > >>> to > >>> > >>> > > make > >>> > >>> > > >>> any > >>> > >>> > > >>> > > other code changes to the connector. > >>> > >>> > > >>> > > > >>> > >>> > > >>> > > To enable exactly-once support for source connectors > on > >>> > your > >>> > >>> > > Connect > >>> > >>> > > >>> > > cluster, see the docs section here: > >>> > >>> > > >>> > > > >>> > >>> > > >>> https://kafka.apache.org/documentation/#connect_exactlyoncesource > >>> > >>> > > >>> > > > >>> > >>> > > >>> > > With regard to transactions, a transactional producer > >>> is > >>> > >>> always > >>> > >>> > > >>> created > >>> > >>> > > >>> > > automatically for your connector by the Connect > runtime > >>> > when > >>> > >>> > > >>> exactly-once > >>> > >>> > > >>> > > support is enabled on the worker. The only reason to > >>> set > >>> > >>> > > >>> > > "transaction.boundary" to "connector" is if your > >>> connector > >>> > >>> would > >>> > >>> > > >>> like to > >>> > >>> > > >>> > > explicitly define its own transaction boundaries. In > >>> this > >>> > >>> case, > >>> > >>> > it > >>> > >>> > > >>> sounds > >>> > >>> > > >>> > > like may be what you want; I just want to make sure > to > >>> call > >>> > >>> out > >>> > >>> > > that > >>> > >>> > > >>> in > >>> > >>> > > >>> > > either case, you should not be directly > instantiating a > >>> > >>> producer > >>> > >>> > in > >>> > >>> > > >>> your > >>> > >>> > > >>> > > connector code, but let the Kafka Connect runtime do > >>> that > >>> > for > >>> > >>> > you, > >>> > >>> > > >>> and > >>> > >>> > > >>> > just > >>> > >>> > > >>> > > worry about returning the right records from > >>> > SourceTask::poll > >>> > >>> > (and > >>> > >>> > > >>> > possibly > >>> > >>> > > >>> > > defining custom transactions using the > >>> TransactionContext > >>> > >>> API). > >>> > >>> > > >>> > > > >>> > >>> > > >>> > > With respect to your question about committing or > >>> aborting > >>> > in > >>> > >>> > > certain > >>> > >>> > > >>> > > circumstances, it'd be useful to know more about your > >>> use > >>> > >>> case, > >>> > >>> > > >>> since it > >>> > >>> > > >>> > > may not be necessary to define custom transaction > >>> > boundaries > >>> > >>> in > >>> > >>> > > your > >>> > >>> > > >>> > > connector at all. > >>> > >>> > > >>> > > > >>> > >>> > > >>> > > Cheers, > >>> > >>> > > >>> > > > >>> > >>> > > >>> > > Chris > >>> > >>> > > >>> > > > >>> > >>> > > >>> > > > >>> > >>> > > >>> > > > >>> > >>> > > >>> > > On Tue, Mar 7, 2023 at 7:21 AM NITTY BENNY < > >>> > >>> nittybe...@gmail.com > >>> > >>> > > > >>> > >>> > > >>> wrote: > >>> > >>> > > >>> > > > >>> > >>> > > >>> > > > Hi Team, > >>> > >>> > > >>> > > > > >>> > >>> > > >>> > > > Adding on top of this, I tried creating a > >>> > >>> TransactionContext > >>> > >>> > > >>> object and > >>> > >>> > > >>> > > > calling the commitTransaction and abortTranaction > >>> methods > >>> > >>> in > >>> > >>> > > source > >>> > >>> > > >>> > > > connectors. > >>> > >>> > > >>> > > > But the main problem I saw is that if there is any > >>> error > >>> > >>> while > >>> > >>> > > >>> parsing > >>> > >>> > > >>> > > the > >>> > >>> > > >>> > > > record, connect is calling an abort but we have a > use > >>> > case > >>> > >>> to > >>> > >>> > > call > >>> > >>> > > >>> > commit > >>> > >>> > > >>> > > > in some cases. Is it a valid use case in terms of > >>> kafka > >>> > >>> > connect? > >>> > >>> > > >>> > > > > >>> > >>> > > >>> > > > Another Question - Should I use a transactional > >>> producer > >>> > >>> > instead > >>> > >>> > > >>> > > > creating an object of TransactionContext? Below is > >>> the > >>> > >>> > connector > >>> > >>> > > >>> > > > configuration I am using. > >>> > >>> > > >>> > > > > >>> > >>> > > >>> > > > exactly.once.support: "required" > >>> > >>> > > >>> > > > transaction.boundary: "connector" > >>> > >>> > > >>> > > > > >>> > >>> > > >>> > > > Could you please help me here? > >>> > >>> > > >>> > > > > >>> > >>> > > >>> > > > Thanks, > >>> > >>> > > >>> > > > Nitty > >>> > >>> > > >>> > > > > >>> > >>> > > >>> > > > On Tue, Mar 7, 2023 at 12:29 AM NITTY BENNY < > >>> > >>> > > nittybe...@gmail.com> > >>> > >>> > > >>> > > wrote: > >>> > >>> > > >>> > > > > >>> > >>> > > >>> > > > > Hi Team, > >>> > >>> > > >>> > > > > I am trying to implement exactly once behavior in > >>> our > >>> > >>> source > >>> > >>> > > >>> > connector. > >>> > >>> > > >>> > > > Is > >>> > >>> > > >>> > > > > there any sample source connector implementation > >>> > >>> available to > >>> > >>> > > >>> have a > >>> > >>> > > >>> > > look > >>> > >>> > > >>> > > > > at? > >>> > >>> > > >>> > > > > Regards, > >>> > >>> > > >>> > > > > Nitty > >>> > >>> > > >>> > > > > > >>> > >>> > > >>> > > > > >>> > >>> > > >>> > > > >>> > >>> > > >>> > > >>> > >>> > > >>> > >>> > >>> > > >> > >>> > >>> > > > >>> > >>> > > >>> > >>> > >>> > >> > >>> > > >>> > >> >