Hi Chris, The difference is in the Task Classes, no difference for value/key convertors.
I don’t see log messages for graceful shutdown. I am not clear on what you mean by shutting down the task. I called the commit operation for the successful records. Should I perform any other steps if I have an invalid record? Please advise. Thanks, Nitty On Mon, Mar 13, 2023 at 3:42 PM Chris Egerton <chr...@aiven.io.invalid> wrote: > Hi Nitty, > > Thanks again for all the details here, especially the log messages. > > > The below mentioned issue is happening for Json connector only. Is there > any difference with asn1,binary,csv and json connector? > > Can you clarify if the difference here is in the Connector/Task classens, > or if it's in the key/value converters that are configured for the > connector? The key/value converters are configured using the > "key.converter" and "value.converter" property and, if problems arise with > them, the task will fail and, if it has a non-empty ongoing transaction, > that transaction will be automatically aborted since we close the task's > Kafka producer when it fails (or shuts down gracefully). > > With regards to these log messages: > > > org.apache.kafka.common.errors.ProducerFencedException: There is a newer > producer with the same transactionalId which fences the current one. > > It looks like your tasks aren't shutting down gracefully in time, which > causes them to be fenced out by the Connect framework later on. Do you see > messages like "Graceful stop of task <TASK ID HERE> failed" in the logs for > your Connect worker? > > Cheers, > > Chris > > On Mon, Mar 13, 2023 at 10:58 AM NITTY BENNY <nittybe...@gmail.com> wrote: > > > Hi Chris, > > > > As you said, the below message is coming when I call an abort if there is > > an invalid record, then for the next transaction I can see the below > > message and then the connector will be stopped. > > 2023-03-13 14:28:26,043 INFO [json-sftp-source-connector|task-0] Aborting > > transaction for batch as requested by connector > > (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask) > > [task-thread-json-sftp-source-connector-0] > > 2023-03-13 14:28:26,043 INFO [json-sftp-source-connector|task-0] > [Producer > > clientId=connector-producer-json-sftp-source-connector-0, > > transactionalId=connect-cluster-json-sftp-source-connector-0] Aborting > > incomplete transaction (org.apache.kafka.clients.producer.KafkaProducer) > > [task-thread-json-sftp-source-connector-0] > > > > The issue with InvalidProducerEpoch is happening when I call the commit > if > > there is an invalid record, and in the next transaction I am getting > > InvalidProducerEpoch Exception and the messages are copied in the > previous > > email. I don't know if this will also be fixed by your bug fix.I am using > > kafka 3.3.1 version as of now. > > > > Thanks, > > Nitty > > > > > > On Mon, Mar 13, 2023 at 10:47 AM NITTY BENNY <nittybe...@gmail.com> > wrote: > > > > > Hi Chris, > > > > > > The below mentioned issue is happening for Json connector only. Is > there > > > any difference with asn1,binary,csv and json connector? > > > > > > Thanks, > > > Nitty > > > > > > On Mon, Mar 13, 2023 at 9:16 AM NITTY BENNY <nittybe...@gmail.com> > > wrote: > > > > > >> Hi Chris, > > >> > > >> Sorry Chris, I am not able to reproduce the above issue. > > >> > > >> I want to share with you one more use case I found. > > >> The use case is in the first batch it returns 2 valid records and then > > in > > >> the next batch it is an invalid record.Below is the transaction_state > > >> topic, when I call a commit while processing an invalid record. > > >> > > >> > > > connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0, > > >> producerId=11, producerEpoch=2, txnTimeoutMs=60000, state=*Ongoing*, > > >> pendingState=None, topicPartitions=HashSet(streams-input-2), > > >> txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620463834) > > >> > > >> then after some time I saw the below states as well, > > >> > > >> > > > connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0, > > >> producerId=11, producerEpoch=3, txnTimeoutMs=60000, > > state=*PrepareAbort*, > > >> pendingState=None, topicPartitions=HashSet(streams-input-2), > > >> txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620526119) > > >> > > > connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0, > > >> producerId=11, producerEpoch=3, txnTimeoutMs=60000, > > state=*CompleteAbort*, > > >> pendingState=None, topicPartitions=HashSet(), > > >> txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620526121) > > >> > > >> Later for the next transaction, when it returns the first batch below > is > > >> the logs I can see. > > >> > > >> Transiting to abortable error state due to > > >> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer > > >> attempted to produce with an old epoch. > > >> (org.apache.kafka.clients.producer.internals.TransactionManager) > > >> [kafka-producer-network-thread | > > >> connector-producer-json-sftp-source-connector-0] > > >> 2023-03-12 11:32:45,220 ERROR [json-sftp-source-connector|task-0] > > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} failed to > > send > > >> record to streams-input: > > >> (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask) > > >> [kafka-producer-network-thread | > > >> connector-producer-json-sftp-source-connector-0] > > >> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer > > >> attempted to produce with an old epoch. > > >> 2023-03-12 11:32:45,222 INFO [json-sftp-source-connector|task-0] > > >> [Producer clientId=connector-producer-json-sftp-source-connector-0, > > >> transactionalId=connect-cluster-json-sftp-source-connector-0] > > Transiting to > > >> fatal error state due to > > >> org.apache.kafka.common.errors.ProducerFencedException: There is a > newer > > >> producer with the same transactionalId which fences the current one. > > >> (org.apache.kafka.clients.producer.internals.TransactionManager) > > >> [kafka-producer-network-thread | > > >> connector-producer-json-sftp-source-connector-0] > > >> 2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0] > > >> [Producer clientId=connector-producer-json-sftp-source-connector-0, > > >> transactionalId=connect-cluster-json-sftp-source-connector-0] Aborting > > >> producer batches due to fatal error > > >> (org.apache.kafka.clients.producer.internals.Sender) > > >> [kafka-producer-network-thread | > > >> connector-producer-json-sftp-source-connector-0] > > >> org.apache.kafka.common.errors.ProducerFencedException: There is a > newer > > >> producer with the same transactionalId which fences the current one. > > >> 2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0] > > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Failed to > > >> flush offsets to storage: > > >> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask) > > >> [kafka-producer-network-thread | > > >> connector-producer-json-sftp-source-connector-0] > > >> org.apache.kafka.common.errors.ProducerFencedException: There is a > newer > > >> producer with the same transactionalId which fences the current one. > > >> 2023-03-12 11:32:45,224 ERROR [json-sftp-source-connector|task-0] > > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} failed to > > send > > >> record to streams-input: > > >> (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask) > > >> [kafka-producer-network-thread | > > >> connector-producer-json-sftp-source-connector-0] > > >> org.apache.kafka.common.errors.ProducerFencedException: There is a > newer > > >> producer with the same transactionalId which fences the current one. > > >> 2023-03-12 11:32:45,222 ERROR > > [json-sftp-source-connector|task-0|offsets] > > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Failed to > > >> commit producer transaction > > >> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask) > > >> [task-thread-json-sftp-source-connector-0] > > >> org.apache.kafka.common.errors.ProducerFencedException: There is a > newer > > >> producer with the same transactionalId which fences the current one. > > >> 2023-03-12 11:32:45,225 ERROR [json-sftp-source-connector|task-0] > > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Task > threw > > an > > >> uncaught and unrecoverable exception. Task is being killed and will > not > > >> recover until manually restarted > > >> (org.apache.kafka.connect.runtime.WorkerTask) > > >> [task-thread-json-sftp-source-connector-0] > > >> > > >> Do you know why it is showing an abort state even if I call commit? > > >> > > >> I tested one more scenario, When I call the commit I saw the below > > >> > > > connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0, > > >> producerId=11, producerEpoch=2, txnTimeoutMs=60000, state=*Ongoing*, > > >> pendingState=None, topicPartitions=HashSet(streams-input-2), > > >> txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620463834) > > >> Then, before changing the states to Abort, I dropped the next file > then > > I > > >> dont see any issues. Previous transaction > > >> as well as the current transaction are committed. > > >> > > >> Thank you for your support. > > >> > > >> Thanks, > > >> Nitty > > >> > > >> On Fri, Mar 10, 2023 at 8:04 PM Chris Egerton <chr...@aiven.io.invalid > > > > >> wrote: > > >> > > >>> Hi Nitty, > > >>> > > >>> > I called commitTransaction when I reach the first error record, but > > >>> commit is not happening for me. Kafka connect tries to abort the > > >>> transaction automatically > > >>> > > >>> This is really interesting--are you certain that your task never > > invoked > > >>> TransactionContext::abortTransaction in this case? I'm looking over > the > > >>> code base and it seems fairly clear that the only thing that could > > >>> trigger > > >>> a call to KafkaProducer::abortTransaction is a request by the task to > > >>> abort > > >>> a transaction (either for a next batch, or for a specific record). It > > may > > >>> help to run the connector in a debugger and/or look for "Aborting > > >>> transaction for batch as requested by connector" or "Aborting > > transaction > > >>> for record on topic <TOPIC NAME HERE> as requested by connector" log > > >>> messages (which will be emitted at INFO level by > > >>> the org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask > class > > if > > >>> the task is requesting an abort). > > >>> > > >>> Regardless, I'll work on a fix for the bug with aborting empty > > >>> transactions. Thanks for helping uncover that one! > > >>> > > >>> Cheers, > > >>> > > >>> Chris > > >>> > > >>> On Thu, Mar 9, 2023 at 6:36 PM NITTY BENNY <nittybe...@gmail.com> > > wrote: > > >>> > > >>> > Hi Chris, > > >>> > > > >>> > We have a use case to commit previous successful records and stop > the > > >>> > processing of the current file and move on with the next file. To > > >>> achieve > > >>> > that I called commitTransaction when I reach the first error > record, > > >>> but > > >>> > commit is not happening for me. Kafka connect tries to abort the > > >>> > transaction automatically, I checked the _transaction_state topic > and > > >>> > states marked as PrepareAbort and CompleteAbort. Do you know why > > kafka > > >>> > connect automatically invokes abort instead of the implicit commit > I > > >>> > called? > > >>> > Then as a result, when I tries to parse the next file - say ABC, I > > saw > > >>> the > > >>> > logs "Aborting incomplete transaction" and ERROR: "Failed to sent > > >>> record to > > >>> > topic", and we lost the first batch of records from the current > > >>> transaction > > >>> > in the file ABC. > > >>> > > > >>> > Is it possible that there's a case where an abort is being > requested > > >>> while > > >>> > the current transaction is empty (i.e., the task hasn't returned > any > > >>> > records from SourceTask::poll since the last transaction was > > >>> > committed/aborted)? --- Yes, that case is possible for us. There > is a > > >>> case > > >>> > where the first record itself an error record. > > >>> > > > >>> > Thanks, > > >>> > Nitty > > >>> > > > >>> > On Thu, Mar 9, 2023 at 3:48 PM Chris Egerton > <chr...@aiven.io.invalid > > > > > >>> > wrote: > > >>> > > > >>> > > Hi Nitty, > > >>> > > > > >>> > > Thanks for the code examples and the detailed explanations, this > is > > >>> > really > > >>> > > helpful! > > >>> > > > > >>> > > > Say if I have a file with 5 records and batch size is 2, and in > > my > > >>> 3rd > > >>> > > batch I have one error record then in that batch, I dont have a > > valid > > >>> > > record to call commit or abort. But I want to commit all the > > previous > > >>> > > batches that were successfully parsed. How do I do that? > > >>> > > > > >>> > > An important thing to keep in mind with the TransactionContext > API > > is > > >>> > that > > >>> > > all records that a task returns from SourceTask::poll are > > implicitly > > >>> > > included in a transaction. Invoking > > >>> SourceTaskContext::transactionContext > > >>> > > doesn't alter this or cause transactions to start being used; > > >>> everything > > >>> > is > > >>> > > already in a transaction, and the Connect runtime automatically > > >>> begins > > >>> > > transactions for any records it sees from the task if it hasn't > > >>> already > > >>> > > begun one. It's also valid to return a null or empty list of > > records > > >>> from > > >>> > > SourceTask::poll. So in this case, you can invoke > > >>> > > transactionContext.commitTransaction() (the no-args variant) and > > >>> return > > >>> > an > > >>> > > empty batch from SourceTask::poll, which will cause the > transaction > > >>> > > containing the 4 valid records that were returned in the last 2 > > >>> batches > > >>> > to > > >>> > > be committed. > > >>> > > > > >>> > > FWIW, I would be a little cautious about this approach. Many > times > > >>> it's > > >>> > > better to fail fast on invalid data; it might be worth it to at > > least > > >>> > allow > > >>> > > users to configure whether the connector fails on invalid data, > or > > >>> > silently > > >>> > > skips over it (which is what happens when transactions are > > aborted). > > >>> > > > > >>> > > > Why is abort not working without adding the last record to the > > >>> list? > > >>> > > > > >>> > > Is it possible that there's a case where an abort is being > > requested > > >>> > while > > >>> > > the current transaction is empty (i.e., the task hasn't returned > > any > > >>> > > records from SourceTask::poll since the last transaction was > > >>> > > committed/aborted)? I think this may be a bug in the Connect > > >>> framework > > >>> > > where we don't check to see if a transaction is already open > when a > > >>> task > > >>> > > requests that a transaction be aborted, which can cause tasks to > > fail > > >>> > (see > > >>> > > https://issues.apache.org/jira/browse/KAFKA-14799 for more > > details). > > >>> > > > > >>> > > Cheers, > > >>> > > > > >>> > > Chris > > >>> > > > > >>> > > > > >>> > > On Wed, Mar 8, 2023 at 6:44 PM NITTY BENNY <nittybe...@gmail.com > > > > >>> wrote: > > >>> > > > > >>> > > > Hi Chris, > > >>> > > > > > >>> > > > I am not sure if you are able to see the images I shared with > > you . > > >>> > > > Copying the code snippet below, > > >>> > > > > > >>> > > > if (expectedRecordCount >= 0) { > > >>> > > > int missingCount = expectedRecordCount - (int) > this. > > >>> > > > recordOffset() - 1; > > >>> > > > if (missingCount > 0) { > > >>> > > > if (transactionContext != null) { > > >>> > > > isMissedRecords = true; > > >>> > > > } else { > > >>> > > > throw new DataException(String.format("Missing > %d > > >>> > records > > >>> > > > (expecting %d, actual %d)", missingCount, expectedRecordCount, > > >>> this. > > >>> > > > recordOffset())); > > >>> > > > } > > >>> > > > } else if (missingCount < 0) { > > >>> > > > if (transactionContext != null) { > > >>> > > > isMissedRecords = true; > > >>> > > > } else { > > >>> > > > throw new DataException(String.format("Too many > > >>> records > > >>> > > > (expecting %d, actual %d)", expectedRecordCount, > > >>> this.recordOffset())); > > >>> > > > } > > >>> > > > } > > >>> > > > } > > >>> > > > addLastRecord(records, null, value); > > >>> > > > } > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > //asn1 or binary abort > > >>> > > > if((config.parseErrorThreshold != null && > parseErrorCount > > >>> >= > > >>> > > > config.parseErrorThreshold > > >>> > > > && lastbatch && transactionContext != null) || > > >>> (isMissedRecords > > >>> > > > && transactionContext != null && lastbatch)) { > > >>> > > > log.info("Transaction is aborting"); > > >>> > > > log.info("records = {}", records); > > >>> > > > if (!records.isEmpty()) { > > >>> > > > log.info("with record"); > > >>> > > > > > >>> > > transactionContext.abortTransaction(records.get(records.size > > >>> > > > ()-1)); > > >>> > > > } else { > > >>> > > > log.info("without record"); > > >>> > > > transactionContext.abortTransaction(); > > >>> > > > } > > >>> > > > > > >>> > > > Thanks, > > >>> > > > Nitty > > >>> > > > > > >>> > > > On Wed, Mar 8, 2023 at 11:38 PM NITTY BENNY < > > nittybe...@gmail.com> > > >>> > > wrote: > > >>> > > > > > >>> > > >> Hi Chris, > > >>> > > >> Sorry for the typo in my previous email. > > >>> > > >> > > >>> > > >> Regarding the point 2,* the task returns a batch of records > from > > >>> > > >> SourceTask::poll (and, if using* > > >>> > > >> > > >>> > > >> > > >>> > > >> *the per-record API provided by the TransactionContext class, > > >>> includes > > >>> > > >> atleast one record that should trigger a transaction > > commit/abort > > >>> in > > >>> > > >> thatbatch)* > > >>> > > >> What if I am using the API without passing a record? We have 2 > > >>> types > > >>> > of > > >>> > > >> use cases, one where on encountering an error record, we want > to > > >>> > commit > > >>> > > >> previous successful batches and disregard the failed record > and > > >>> > upcoming > > >>> > > >> batches. In this case we created the transactionContext just > > >>> before > > >>> > > reading > > >>> > > >> the file (file is our transaction boundary).Say if I have a > file > > >>> with > > >>> > 5 > > >>> > > >> records and batch size is 2, and in my 3rd batch I have one > > error > > >>> > record > > >>> > > >> then in that batch, I dont have a valid record to call commit > or > > >>> > abort. > > >>> > > But > > >>> > > >> I want to commit all the previous batches that were > successfully > > >>> > parsed. > > >>> > > >> How do I do that? > > >>> > > >> > > >>> > > >> Second use case is where I want to abort a transaction if the > > >>> record > > >>> > > >> count doesn't match. > > >>> > > >> Code Snippet : > > >>> > > >> [image: image.png] > > >>> > > >> There are no error records in this case. If you see I added > the > > >>> > > condition > > >>> > > >> of transactionContext check to implement exactly once, without > > >>> > > >> transaction it was just throwing the exception without calling > > the > > >>> > > >> addLastRecord() method and in the catch block it just logs the > > >>> message > > >>> > > and > > >>> > > >> return the list of records without the last record to > poll().To > > >>> make > > >>> > it > > >>> > > >> work, I called the method addLastRecord() in this case, so it > is > > >>> not > > >>> > > >> throwing the exception and list has last record as well. Then > I > > >>> called > > >>> > > the > > >>> > > >> abort, everything got aborted. Why is abort not working > without > > >>> adding > > >>> > > the > > >>> > > >> last record to the list? > > >>> > > >> [image: image.png] > > >>> > > >> > > >>> > > >> Code to call abort. > > >>> > > >> > > >>> > > >> > > >>> > > >> > > >>> > > >> > > >>> > > >> Thanks, > > >>> > > >> Nitty > > >>> > > >> > > >>> > > >> On Wed, Mar 8, 2023 at 4:26 PM Chris Egerton > > >>> <chr...@aiven.io.invalid > > >>> > > > > >>> > > >> wrote: > > >>> > > >> > > >>> > > >>> Hi Nitty, > > >>> > > >>> > > >>> > > >>> I'm a little confused about what you mean by this part: > > >>> > > >>> > > >>> > > >>> > transaction is not getting completed because it is not > > >>> commiting > > >>> > the > > >>> > > >>> transaction offest. > > >>> > > >>> > > >>> > > >>> The only conditions required for a transaction to be > completed > > >>> when a > > >>> > > >>> connector is defining its own transaction boundaries are: > > >>> > > >>> > > >>> > > >>> 1. The task requests a transaction commit/abort from the > > >>> > > >>> TransactionContext > > >>> > > >>> 2. The task returns a batch of records from SourceTask::poll > > >>> (and, if > > >>> > > >>> using > > >>> > > >>> the per-record API provided by the TransactionContext class, > > >>> includes > > >>> > > at > > >>> > > >>> least one record that should trigger a transaction > commit/abort > > >>> in > > >>> > that > > >>> > > >>> batch) > > >>> > > >>> > > >>> > > >>> The Connect runtime should automatically commit source > offsets > > to > > >>> > Kafka > > >>> > > >>> whenever a transaction is completed, either by commit or > abort. > > >>> This > > >>> > is > > >>> > > >>> because transactions should only be aborted for data that > > should > > >>> > never > > >>> > > be > > >>> > > >>> re-read by the connector; if there is a validation error that > > >>> should > > >>> > be > > >>> > > >>> handled by reconfiguring the connector, then the task should > > >>> throw an > > >>> > > >>> exception instead of aborting the transaction. > > >>> > > >>> > > >>> > > >>> If possible, do you think you could provide a brief code > > snippet > > >>> > > >>> illustrating what your task is doing that's causing issues? > > >>> > > >>> > > >>> > > >>> Cheers, > > >>> > > >>> > > >>> > > >>> Chris (not Chrise 🙂) > > >>> > > >>> > > >>> > > >>> On Tue, Mar 7, 2023 at 10:17 AM NITTY BENNY < > > >>> nittybe...@gmail.com> > > >>> > > >>> wrote: > > >>> > > >>> > > >>> > > >>> > Hi Chrise, > > >>> > > >>> > > > >>> > > >>> > Thanks for sharing the details. > > >>> > > >>> > > > >>> > > >>> > Regarding the use case, For Asn1 source connector we have a > > use > > >>> > case > > >>> > > to > > >>> > > >>> > validate number of records in the file with the number of > > >>> records > > >>> > in > > >>> > > >>> the > > >>> > > >>> > header. So currently, if validation fails we are not > sending > > >>> the > > >>> > last > > >>> > > >>> > record to the topic. But after introducing exactly once > with > > >>> > > connector > > >>> > > >>> > transaction boundary, I can see that if I call an abort > when > > >>> the > > >>> > > >>> validation > > >>> > > >>> > fails, transaction is not getting completed because it is > not > > >>> > > >>> commiting the > > >>> > > >>> > transaction offest. I saw that transaction state changed to > > >>> > > >>> CompleteAbort. > > >>> > > >>> > So for my next transaction I am getting > > >>> > InvalidProducerEpochException > > >>> > > >>> and > > >>> > > >>> > then task stopped after that. I tried calling the abort > after > > >>> > sending > > >>> > > >>> last > > >>> > > >>> > record to the topic then transaction getting completed. > > >>> > > >>> > > > >>> > > >>> > I dont know if I am doing anything wrong here. > > >>> > > >>> > > > >>> > > >>> > Please advise. > > >>> > > >>> > Thanks, > > >>> > > >>> > Nitty > > >>> > > >>> > > > >>> > > >>> > On Tue 7 Mar 2023 at 2:21 p.m., Chris Egerton > > >>> > > <chr...@aiven.io.invalid > > >>> > > >>> > > > >>> > > >>> > wrote: > > >>> > > >>> > > > >>> > > >>> > > Hi Nitty, > > >>> > > >>> > > > > >>> > > >>> > > We've recently added some documentation on implementing > > >>> > > exactly-once > > >>> > > >>> > source > > >>> > > >>> > > connectors here: > > >>> > > >>> > > > > >>> > > >>> > > > >>> > > >>> > > >>> > > > > >>> > > > >>> > > > https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors > > >>> > > >>> > > . > > >>> > > >>> > > To quote a relevant passage from those docs: > > >>> > > >>> > > > > >>> > > >>> > > > In order for a source connector to take advantage of > this > > >>> > > support, > > >>> > > >>> it > > >>> > > >>> > > must be able to provide meaningful source offsets for > each > > >>> record > > >>> > > >>> that it > > >>> > > >>> > > emits, and resume consumption from the external system at > > the > > >>> > exact > > >>> > > >>> > > position corresponding to any of those offsets without > > >>> dropping > > >>> > or > > >>> > > >>> > > duplicating messages. > > >>> > > >>> > > > > >>> > > >>> > > So, as long as your source connector is able to use the > > Kafka > > >>> > > Connect > > >>> > > >>> > > framework's offsets API correctly, it shouldn't be > > necessary > > >>> to > > >>> > > make > > >>> > > >>> any > > >>> > > >>> > > other code changes to the connector. > > >>> > > >>> > > > > >>> > > >>> > > To enable exactly-once support for source connectors on > > your > > >>> > > Connect > > >>> > > >>> > > cluster, see the docs section here: > > >>> > > >>> > > > > >>> > https://kafka.apache.org/documentation/#connect_exactlyoncesource > > >>> > > >>> > > > > >>> > > >>> > > With regard to transactions, a transactional producer is > > >>> always > > >>> > > >>> created > > >>> > > >>> > > automatically for your connector by the Connect runtime > > when > > >>> > > >>> exactly-once > > >>> > > >>> > > support is enabled on the worker. The only reason to set > > >>> > > >>> > > "transaction.boundary" to "connector" is if your > connector > > >>> would > > >>> > > >>> like to > > >>> > > >>> > > explicitly define its own transaction boundaries. In this > > >>> case, > > >>> > it > > >>> > > >>> sounds > > >>> > > >>> > > like may be what you want; I just want to make sure to > call > > >>> out > > >>> > > that > > >>> > > >>> in > > >>> > > >>> > > either case, you should not be directly instantiating a > > >>> producer > > >>> > in > > >>> > > >>> your > > >>> > > >>> > > connector code, but let the Kafka Connect runtime do that > > for > > >>> > you, > > >>> > > >>> and > > >>> > > >>> > just > > >>> > > >>> > > worry about returning the right records from > > SourceTask::poll > > >>> > (and > > >>> > > >>> > possibly > > >>> > > >>> > > defining custom transactions using the TransactionContext > > >>> API). > > >>> > > >>> > > > > >>> > > >>> > > With respect to your question about committing or > aborting > > in > > >>> > > certain > > >>> > > >>> > > circumstances, it'd be useful to know more about your use > > >>> case, > > >>> > > >>> since it > > >>> > > >>> > > may not be necessary to define custom transaction > > boundaries > > >>> in > > >>> > > your > > >>> > > >>> > > connector at all. > > >>> > > >>> > > > > >>> > > >>> > > Cheers, > > >>> > > >>> > > > > >>> > > >>> > > Chris > > >>> > > >>> > > > > >>> > > >>> > > > > >>> > > >>> > > > > >>> > > >>> > > On Tue, Mar 7, 2023 at 7:21 AM NITTY BENNY < > > >>> nittybe...@gmail.com > > >>> > > > > >>> > > >>> wrote: > > >>> > > >>> > > > > >>> > > >>> > > > Hi Team, > > >>> > > >>> > > > > > >>> > > >>> > > > Adding on top of this, I tried creating a > > >>> TransactionContext > > >>> > > >>> object and > > >>> > > >>> > > > calling the commitTransaction and abortTranaction > methods > > >>> in > > >>> > > source > > >>> > > >>> > > > connectors. > > >>> > > >>> > > > But the main problem I saw is that if there is any > error > > >>> while > > >>> > > >>> parsing > > >>> > > >>> > > the > > >>> > > >>> > > > record, connect is calling an abort but we have a use > > case > > >>> to > > >>> > > call > > >>> > > >>> > commit > > >>> > > >>> > > > in some cases. Is it a valid use case in terms of kafka > > >>> > connect? > > >>> > > >>> > > > > > >>> > > >>> > > > Another Question - Should I use a transactional > producer > > >>> > instead > > >>> > > >>> > > > creating an object of TransactionContext? Below is the > > >>> > connector > > >>> > > >>> > > > configuration I am using. > > >>> > > >>> > > > > > >>> > > >>> > > > exactly.once.support: "required" > > >>> > > >>> > > > transaction.boundary: "connector" > > >>> > > >>> > > > > > >>> > > >>> > > > Could you please help me here? > > >>> > > >>> > > > > > >>> > > >>> > > > Thanks, > > >>> > > >>> > > > Nitty > > >>> > > >>> > > > > > >>> > > >>> > > > On Tue, Mar 7, 2023 at 12:29 AM NITTY BENNY < > > >>> > > nittybe...@gmail.com> > > >>> > > >>> > > wrote: > > >>> > > >>> > > > > > >>> > > >>> > > > > Hi Team, > > >>> > > >>> > > > > I am trying to implement exactly once behavior in our > > >>> source > > >>> > > >>> > connector. > > >>> > > >>> > > > Is > > >>> > > >>> > > > > there any sample source connector implementation > > >>> available to > > >>> > > >>> have a > > >>> > > >>> > > look > > >>> > > >>> > > > > at? > > >>> > > >>> > > > > Regards, > > >>> > > >>> > > > > Nitty > > >>> > > >>> > > > > > > >>> > > >>> > > > > > >>> > > >>> > > > > >>> > > >>> > > > >>> > > >>> > > >>> > > >> > > >>> > > > > >>> > > > >>> > > >> > > >