Hi Chris, I really don't understand why a graceful shutdown will happen during a commit operation? Am I understanding something wrong here?. I see this happens when I have a batch of 2 valid records and in the second batch the record is invalid. In that case I want to commit the valid records. So I called commit and sent an empty list for the current batch to poll() and then when the next file comes in and poll sees new records, I see InvalidProducerEpochException. Please advise me.
Thanks, Nitty On Mon, Mar 13, 2023 at 5:33 PM NITTY BENNY <nittybe...@gmail.com> wrote: > Hi Chris, > > The difference is in the Task Classes, no difference for value/key > convertors. > > I don’t see log messages for graceful shutdown. I am not clear on what you > mean by shutting down the task. > > I called the commit operation for the successful records. Should I perform > any other steps if I have an invalid record? > Please advise. > > Thanks, > Nitty > > On Mon, Mar 13, 2023 at 3:42 PM Chris Egerton <chr...@aiven.io.invalid> > wrote: > >> Hi Nitty, >> >> Thanks again for all the details here, especially the log messages. >> >> > The below mentioned issue is happening for Json connector only. Is there >> any difference with asn1,binary,csv and json connector? >> >> Can you clarify if the difference here is in the Connector/Task classens, >> or if it's in the key/value converters that are configured for the >> connector? The key/value converters are configured using the >> "key.converter" and "value.converter" property and, if problems arise with >> them, the task will fail and, if it has a non-empty ongoing transaction, >> that transaction will be automatically aborted since we close the task's >> Kafka producer when it fails (or shuts down gracefully). >> >> With regards to these log messages: >> >> > org.apache.kafka.common.errors.ProducerFencedException: There is a newer >> producer with the same transactionalId which fences the current one. >> >> It looks like your tasks aren't shutting down gracefully in time, which >> causes them to be fenced out by the Connect framework later on. Do you see >> messages like "Graceful stop of task <TASK ID HERE> failed" in the logs >> for >> your Connect worker? >> >> Cheers, >> >> Chris >> >> On Mon, Mar 13, 2023 at 10:58 AM NITTY BENNY <nittybe...@gmail.com> >> wrote: >> >> > Hi Chris, >> > >> > As you said, the below message is coming when I call an abort if there >> is >> > an invalid record, then for the next transaction I can see the below >> > message and then the connector will be stopped. >> > 2023-03-13 14:28:26,043 INFO [json-sftp-source-connector|task-0] >> Aborting >> > transaction for batch as requested by connector >> > (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask) >> > [task-thread-json-sftp-source-connector-0] >> > 2023-03-13 14:28:26,043 INFO [json-sftp-source-connector|task-0] >> [Producer >> > clientId=connector-producer-json-sftp-source-connector-0, >> > transactionalId=connect-cluster-json-sftp-source-connector-0] Aborting >> > incomplete transaction (org.apache.kafka.clients.producer.KafkaProducer) >> > [task-thread-json-sftp-source-connector-0] >> > >> > The issue with InvalidProducerEpoch is happening when I call the commit >> if >> > there is an invalid record, and in the next transaction I am getting >> > InvalidProducerEpoch Exception and the messages are copied in the >> previous >> > email. I don't know if this will also be fixed by your bug fix.I am >> using >> > kafka 3.3.1 version as of now. >> > >> > Thanks, >> > Nitty >> > >> > >> > On Mon, Mar 13, 2023 at 10:47 AM NITTY BENNY <nittybe...@gmail.com> >> wrote: >> > >> > > Hi Chris, >> > > >> > > The below mentioned issue is happening for Json connector only. Is >> there >> > > any difference with asn1,binary,csv and json connector? >> > > >> > > Thanks, >> > > Nitty >> > > >> > > On Mon, Mar 13, 2023 at 9:16 AM NITTY BENNY <nittybe...@gmail.com> >> > wrote: >> > > >> > >> Hi Chris, >> > >> >> > >> Sorry Chris, I am not able to reproduce the above issue. >> > >> >> > >> I want to share with you one more use case I found. >> > >> The use case is in the first batch it returns 2 valid records and >> then >> > in >> > >> the next batch it is an invalid record.Below is the transaction_state >> > >> topic, when I call a commit while processing an invalid record. >> > >> >> > >> >> > >> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0, >> > >> producerId=11, producerEpoch=2, txnTimeoutMs=60000, state=*Ongoing*, >> > >> pendingState=None, topicPartitions=HashSet(streams-input-2), >> > >> txnStartTimestamp=1678620463834, >> txnLastUpdateTimestamp=1678620463834) >> > >> >> > >> then after some time I saw the below states as well, >> > >> >> > >> >> > >> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0, >> > >> producerId=11, producerEpoch=3, txnTimeoutMs=60000, >> > state=*PrepareAbort*, >> > >> pendingState=None, topicPartitions=HashSet(streams-input-2), >> > >> txnStartTimestamp=1678620463834, >> txnLastUpdateTimestamp=1678620526119) >> > >> >> > >> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0, >> > >> producerId=11, producerEpoch=3, txnTimeoutMs=60000, >> > state=*CompleteAbort*, >> > >> pendingState=None, topicPartitions=HashSet(), >> > >> txnStartTimestamp=1678620463834, >> txnLastUpdateTimestamp=1678620526121) >> > >> >> > >> Later for the next transaction, when it returns the first batch >> below is >> > >> the logs I can see. >> > >> >> > >> Transiting to abortable error state due to >> > >> org.apache.kafka.common.errors.InvalidProducerEpochException: >> Producer >> > >> attempted to produce with an old epoch. >> > >> (org.apache.kafka.clients.producer.internals.TransactionManager) >> > >> [kafka-producer-network-thread | >> > >> connector-producer-json-sftp-source-connector-0] >> > >> 2023-03-12 11:32:45,220 ERROR [json-sftp-source-connector|task-0] >> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} failed >> to >> > send >> > >> record to streams-input: >> > >> (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask) >> > >> [kafka-producer-network-thread | >> > >> connector-producer-json-sftp-source-connector-0] >> > >> org.apache.kafka.common.errors.InvalidProducerEpochException: >> Producer >> > >> attempted to produce with an old epoch. >> > >> 2023-03-12 11:32:45,222 INFO [json-sftp-source-connector|task-0] >> > >> [Producer clientId=connector-producer-json-sftp-source-connector-0, >> > >> transactionalId=connect-cluster-json-sftp-source-connector-0] >> > Transiting to >> > >> fatal error state due to >> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a >> newer >> > >> producer with the same transactionalId which fences the current one. >> > >> (org.apache.kafka.clients.producer.internals.TransactionManager) >> > >> [kafka-producer-network-thread | >> > >> connector-producer-json-sftp-source-connector-0] >> > >> 2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0] >> > >> [Producer clientId=connector-producer-json-sftp-source-connector-0, >> > >> transactionalId=connect-cluster-json-sftp-source-connector-0] >> Aborting >> > >> producer batches due to fatal error >> > >> (org.apache.kafka.clients.producer.internals.Sender) >> > >> [kafka-producer-network-thread | >> > >> connector-producer-json-sftp-source-connector-0] >> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a >> newer >> > >> producer with the same transactionalId which fences the current one. >> > >> 2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0] >> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Failed >> to >> > >> flush offsets to storage: >> > >> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask) >> > >> [kafka-producer-network-thread | >> > >> connector-producer-json-sftp-source-connector-0] >> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a >> newer >> > >> producer with the same transactionalId which fences the current one. >> > >> 2023-03-12 11:32:45,224 ERROR [json-sftp-source-connector|task-0] >> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} failed >> to >> > send >> > >> record to streams-input: >> > >> (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask) >> > >> [kafka-producer-network-thread | >> > >> connector-producer-json-sftp-source-connector-0] >> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a >> newer >> > >> producer with the same transactionalId which fences the current one. >> > >> 2023-03-12 11:32:45,222 ERROR >> > [json-sftp-source-connector|task-0|offsets] >> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Failed >> to >> > >> commit producer transaction >> > >> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask) >> > >> [task-thread-json-sftp-source-connector-0] >> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a >> newer >> > >> producer with the same transactionalId which fences the current one. >> > >> 2023-03-12 11:32:45,225 ERROR [json-sftp-source-connector|task-0] >> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Task >> threw >> > an >> > >> uncaught and unrecoverable exception. Task is being killed and will >> not >> > >> recover until manually restarted >> > >> (org.apache.kafka.connect.runtime.WorkerTask) >> > >> [task-thread-json-sftp-source-connector-0] >> > >> >> > >> Do you know why it is showing an abort state even if I call commit? >> > >> >> > >> I tested one more scenario, When I call the commit I saw the below >> > >> >> > >> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0, >> > >> producerId=11, producerEpoch=2, txnTimeoutMs=60000, state=*Ongoing*, >> > >> pendingState=None, topicPartitions=HashSet(streams-input-2), >> > >> txnStartTimestamp=1678620463834, >> txnLastUpdateTimestamp=1678620463834) >> > >> Then, before changing the states to Abort, I dropped the next file >> then >> > I >> > >> dont see any issues. Previous transaction >> > >> as well as the current transaction are committed. >> > >> >> > >> Thank you for your support. >> > >> >> > >> Thanks, >> > >> Nitty >> > >> >> > >> On Fri, Mar 10, 2023 at 8:04 PM Chris Egerton >> <chr...@aiven.io.invalid> >> > >> wrote: >> > >> >> > >>> Hi Nitty, >> > >>> >> > >>> > I called commitTransaction when I reach the first error record, >> but >> > >>> commit is not happening for me. Kafka connect tries to abort the >> > >>> transaction automatically >> > >>> >> > >>> This is really interesting--are you certain that your task never >> > invoked >> > >>> TransactionContext::abortTransaction in this case? I'm looking over >> the >> > >>> code base and it seems fairly clear that the only thing that could >> > >>> trigger >> > >>> a call to KafkaProducer::abortTransaction is a request by the task >> to >> > >>> abort >> > >>> a transaction (either for a next batch, or for a specific record). >> It >> > may >> > >>> help to run the connector in a debugger and/or look for "Aborting >> > >>> transaction for batch as requested by connector" or "Aborting >> > transaction >> > >>> for record on topic <TOPIC NAME HERE> as requested by connector" log >> > >>> messages (which will be emitted at INFO level by >> > >>> the org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask >> class >> > if >> > >>> the task is requesting an abort). >> > >>> >> > >>> Regardless, I'll work on a fix for the bug with aborting empty >> > >>> transactions. Thanks for helping uncover that one! >> > >>> >> > >>> Cheers, >> > >>> >> > >>> Chris >> > >>> >> > >>> On Thu, Mar 9, 2023 at 6:36 PM NITTY BENNY <nittybe...@gmail.com> >> > wrote: >> > >>> >> > >>> > Hi Chris, >> > >>> > >> > >>> > We have a use case to commit previous successful records and stop >> the >> > >>> > processing of the current file and move on with the next file. To >> > >>> achieve >> > >>> > that I called commitTransaction when I reach the first error >> record, >> > >>> but >> > >>> > commit is not happening for me. Kafka connect tries to abort the >> > >>> > transaction automatically, I checked the _transaction_state topic >> and >> > >>> > states marked as PrepareAbort and CompleteAbort. Do you know why >> > kafka >> > >>> > connect automatically invokes abort instead of the implicit >> commit I >> > >>> > called? >> > >>> > Then as a result, when I tries to parse the next file - say ABC, I >> > saw >> > >>> the >> > >>> > logs "Aborting incomplete transaction" and ERROR: "Failed to sent >> > >>> record to >> > >>> > topic", and we lost the first batch of records from the current >> > >>> transaction >> > >>> > in the file ABC. >> > >>> > >> > >>> > Is it possible that there's a case where an abort is being >> requested >> > >>> while >> > >>> > the current transaction is empty (i.e., the task hasn't returned >> any >> > >>> > records from SourceTask::poll since the last transaction was >> > >>> > committed/aborted)? --- Yes, that case is possible for us. There >> is a >> > >>> case >> > >>> > where the first record itself an error record. >> > >>> > >> > >>> > Thanks, >> > >>> > Nitty >> > >>> > >> > >>> > On Thu, Mar 9, 2023 at 3:48 PM Chris Egerton >> <chr...@aiven.io.invalid >> > > >> > >>> > wrote: >> > >>> > >> > >>> > > Hi Nitty, >> > >>> > > >> > >>> > > Thanks for the code examples and the detailed explanations, >> this is >> > >>> > really >> > >>> > > helpful! >> > >>> > > >> > >>> > > > Say if I have a file with 5 records and batch size is 2, and >> in >> > my >> > >>> 3rd >> > >>> > > batch I have one error record then in that batch, I dont have a >> > valid >> > >>> > > record to call commit or abort. But I want to commit all the >> > previous >> > >>> > > batches that were successfully parsed. How do I do that? >> > >>> > > >> > >>> > > An important thing to keep in mind with the TransactionContext >> API >> > is >> > >>> > that >> > >>> > > all records that a task returns from SourceTask::poll are >> > implicitly >> > >>> > > included in a transaction. Invoking >> > >>> SourceTaskContext::transactionContext >> > >>> > > doesn't alter this or cause transactions to start being used; >> > >>> everything >> > >>> > is >> > >>> > > already in a transaction, and the Connect runtime automatically >> > >>> begins >> > >>> > > transactions for any records it sees from the task if it hasn't >> > >>> already >> > >>> > > begun one. It's also valid to return a null or empty list of >> > records >> > >>> from >> > >>> > > SourceTask::poll. So in this case, you can invoke >> > >>> > > transactionContext.commitTransaction() (the no-args variant) and >> > >>> return >> > >>> > an >> > >>> > > empty batch from SourceTask::poll, which will cause the >> transaction >> > >>> > > containing the 4 valid records that were returned in the last 2 >> > >>> batches >> > >>> > to >> > >>> > > be committed. >> > >>> > > >> > >>> > > FWIW, I would be a little cautious about this approach. Many >> times >> > >>> it's >> > >>> > > better to fail fast on invalid data; it might be worth it to at >> > least >> > >>> > allow >> > >>> > > users to configure whether the connector fails on invalid data, >> or >> > >>> > silently >> > >>> > > skips over it (which is what happens when transactions are >> > aborted). >> > >>> > > >> > >>> > > > Why is abort not working without adding the last record to the >> > >>> list? >> > >>> > > >> > >>> > > Is it possible that there's a case where an abort is being >> > requested >> > >>> > while >> > >>> > > the current transaction is empty (i.e., the task hasn't returned >> > any >> > >>> > > records from SourceTask::poll since the last transaction was >> > >>> > > committed/aborted)? I think this may be a bug in the Connect >> > >>> framework >> > >>> > > where we don't check to see if a transaction is already open >> when a >> > >>> task >> > >>> > > requests that a transaction be aborted, which can cause tasks to >> > fail >> > >>> > (see >> > >>> > > https://issues.apache.org/jira/browse/KAFKA-14799 for more >> > details). >> > >>> > > >> > >>> > > Cheers, >> > >>> > > >> > >>> > > Chris >> > >>> > > >> > >>> > > >> > >>> > > On Wed, Mar 8, 2023 at 6:44 PM NITTY BENNY < >> nittybe...@gmail.com> >> > >>> wrote: >> > >>> > > >> > >>> > > > Hi Chris, >> > >>> > > > >> > >>> > > > I am not sure if you are able to see the images I shared with >> > you . >> > >>> > > > Copying the code snippet below, >> > >>> > > > >> > >>> > > > if (expectedRecordCount >= 0) { >> > >>> > > > int missingCount = expectedRecordCount - (int) >> this. >> > >>> > > > recordOffset() - 1; >> > >>> > > > if (missingCount > 0) { >> > >>> > > > if (transactionContext != null) { >> > >>> > > > isMissedRecords = true; >> > >>> > > > } else { >> > >>> > > > throw new >> DataException(String.format("Missing %d >> > >>> > records >> > >>> > > > (expecting %d, actual %d)", missingCount, expectedRecordCount, >> > >>> this. >> > >>> > > > recordOffset())); >> > >>> > > > } >> > >>> > > > } else if (missingCount < 0) { >> > >>> > > > if (transactionContext != null) { >> > >>> > > > isMissedRecords = true; >> > >>> > > > } else { >> > >>> > > > throw new DataException(String.format("Too >> many >> > >>> records >> > >>> > > > (expecting %d, actual %d)", expectedRecordCount, >> > >>> this.recordOffset())); >> > >>> > > > } >> > >>> > > > } >> > >>> > > > } >> > >>> > > > addLastRecord(records, null, value); >> > >>> > > > } >> > >>> > > > >> > >>> > > > >> > >>> > > > >> > >>> > > > //asn1 or binary abort >> > >>> > > > if((config.parseErrorThreshold != null && >> parseErrorCount >> > >>> >= >> > >>> > > > config.parseErrorThreshold >> > >>> > > > && lastbatch && transactionContext != null) || >> > >>> (isMissedRecords >> > >>> > > > && transactionContext != null && lastbatch)) { >> > >>> > > > log.info("Transaction is aborting"); >> > >>> > > > log.info("records = {}", records); >> > >>> > > > if (!records.isEmpty()) { >> > >>> > > > log.info("with record"); >> > >>> > > > >> > >>> > > transactionContext.abortTransaction(records.get(records.size >> > >>> > > > ()-1)); >> > >>> > > > } else { >> > >>> > > > log.info("without record"); >> > >>> > > > transactionContext.abortTransaction(); >> > >>> > > > } >> > >>> > > > >> > >>> > > > Thanks, >> > >>> > > > Nitty >> > >>> > > > >> > >>> > > > On Wed, Mar 8, 2023 at 11:38 PM NITTY BENNY < >> > nittybe...@gmail.com> >> > >>> > > wrote: >> > >>> > > > >> > >>> > > >> Hi Chris, >> > >>> > > >> Sorry for the typo in my previous email. >> > >>> > > >> >> > >>> > > >> Regarding the point 2,* the task returns a batch of records >> from >> > >>> > > >> SourceTask::poll (and, if using* >> > >>> > > >> >> > >>> > > >> >> > >>> > > >> *the per-record API provided by the TransactionContext class, >> > >>> includes >> > >>> > > >> atleast one record that should trigger a transaction >> > commit/abort >> > >>> in >> > >>> > > >> thatbatch)* >> > >>> > > >> What if I am using the API without passing a record? We have >> 2 >> > >>> types >> > >>> > of >> > >>> > > >> use cases, one where on encountering an error record, we >> want to >> > >>> > commit >> > >>> > > >> previous successful batches and disregard the failed record >> and >> > >>> > upcoming >> > >>> > > >> batches. In this case we created the transactionContext just >> > >>> before >> > >>> > > reading >> > >>> > > >> the file (file is our transaction boundary).Say if I have a >> file >> > >>> with >> > >>> > 5 >> > >>> > > >> records and batch size is 2, and in my 3rd batch I have one >> > error >> > >>> > record >> > >>> > > >> then in that batch, I dont have a valid record to call >> commit or >> > >>> > abort. >> > >>> > > But >> > >>> > > >> I want to commit all the previous batches that were >> successfully >> > >>> > parsed. >> > >>> > > >> How do I do that? >> > >>> > > >> >> > >>> > > >> Second use case is where I want to abort a transaction if the >> > >>> record >> > >>> > > >> count doesn't match. >> > >>> > > >> Code Snippet : >> > >>> > > >> [image: image.png] >> > >>> > > >> There are no error records in this case. If you see I added >> the >> > >>> > > condition >> > >>> > > >> of transactionContext check to implement exactly once, >> without >> > >>> > > >> transaction it was just throwing the exception without >> calling >> > the >> > >>> > > >> addLastRecord() method and in the catch block it just logs >> the >> > >>> message >> > >>> > > and >> > >>> > > >> return the list of records without the last record to >> poll().To >> > >>> make >> > >>> > it >> > >>> > > >> work, I called the method addLastRecord() in this case, so >> it is >> > >>> not >> > >>> > > >> throwing the exception and list has last record as well. >> Then I >> > >>> called >> > >>> > > the >> > >>> > > >> abort, everything got aborted. Why is abort not working >> without >> > >>> adding >> > >>> > > the >> > >>> > > >> last record to the list? >> > >>> > > >> [image: image.png] >> > >>> > > >> >> > >>> > > >> Code to call abort. >> > >>> > > >> >> > >>> > > >> >> > >>> > > >> >> > >>> > > >> >> > >>> > > >> Thanks, >> > >>> > > >> Nitty >> > >>> > > >> >> > >>> > > >> On Wed, Mar 8, 2023 at 4:26 PM Chris Egerton >> > >>> <chr...@aiven.io.invalid >> > >>> > > >> > >>> > > >> wrote: >> > >>> > > >> >> > >>> > > >>> Hi Nitty, >> > >>> > > >>> >> > >>> > > >>> I'm a little confused about what you mean by this part: >> > >>> > > >>> >> > >>> > > >>> > transaction is not getting completed because it is not >> > >>> commiting >> > >>> > the >> > >>> > > >>> transaction offest. >> > >>> > > >>> >> > >>> > > >>> The only conditions required for a transaction to be >> completed >> > >>> when a >> > >>> > > >>> connector is defining its own transaction boundaries are: >> > >>> > > >>> >> > >>> > > >>> 1. The task requests a transaction commit/abort from the >> > >>> > > >>> TransactionContext >> > >>> > > >>> 2. The task returns a batch of records from SourceTask::poll >> > >>> (and, if >> > >>> > > >>> using >> > >>> > > >>> the per-record API provided by the TransactionContext class, >> > >>> includes >> > >>> > > at >> > >>> > > >>> least one record that should trigger a transaction >> commit/abort >> > >>> in >> > >>> > that >> > >>> > > >>> batch) >> > >>> > > >>> >> > >>> > > >>> The Connect runtime should automatically commit source >> offsets >> > to >> > >>> > Kafka >> > >>> > > >>> whenever a transaction is completed, either by commit or >> abort. >> > >>> This >> > >>> > is >> > >>> > > >>> because transactions should only be aborted for data that >> > should >> > >>> > never >> > >>> > > be >> > >>> > > >>> re-read by the connector; if there is a validation error >> that >> > >>> should >> > >>> > be >> > >>> > > >>> handled by reconfiguring the connector, then the task should >> > >>> throw an >> > >>> > > >>> exception instead of aborting the transaction. >> > >>> > > >>> >> > >>> > > >>> If possible, do you think you could provide a brief code >> > snippet >> > >>> > > >>> illustrating what your task is doing that's causing issues? >> > >>> > > >>> >> > >>> > > >>> Cheers, >> > >>> > > >>> >> > >>> > > >>> Chris (not Chrise 🙂) >> > >>> > > >>> >> > >>> > > >>> On Tue, Mar 7, 2023 at 10:17 AM NITTY BENNY < >> > >>> nittybe...@gmail.com> >> > >>> > > >>> wrote: >> > >>> > > >>> >> > >>> > > >>> > Hi Chrise, >> > >>> > > >>> > >> > >>> > > >>> > Thanks for sharing the details. >> > >>> > > >>> > >> > >>> > > >>> > Regarding the use case, For Asn1 source connector we have >> a >> > use >> > >>> > case >> > >>> > > to >> > >>> > > >>> > validate number of records in the file with the number of >> > >>> records >> > >>> > in >> > >>> > > >>> the >> > >>> > > >>> > header. So currently, if validation fails we are not >> sending >> > >>> the >> > >>> > last >> > >>> > > >>> > record to the topic. But after introducing exactly once >> with >> > >>> > > connector >> > >>> > > >>> > transaction boundary, I can see that if I call an abort >> when >> > >>> the >> > >>> > > >>> validation >> > >>> > > >>> > fails, transaction is not getting completed because it is >> not >> > >>> > > >>> commiting the >> > >>> > > >>> > transaction offest. I saw that transaction state changed >> to >> > >>> > > >>> CompleteAbort. >> > >>> > > >>> > So for my next transaction I am getting >> > >>> > InvalidProducerEpochException >> > >>> > > >>> and >> > >>> > > >>> > then task stopped after that. I tried calling the abort >> after >> > >>> > sending >> > >>> > > >>> last >> > >>> > > >>> > record to the topic then transaction getting completed. >> > >>> > > >>> > >> > >>> > > >>> > I dont know if I am doing anything wrong here. >> > >>> > > >>> > >> > >>> > > >>> > Please advise. >> > >>> > > >>> > Thanks, >> > >>> > > >>> > Nitty >> > >>> > > >>> > >> > >>> > > >>> > On Tue 7 Mar 2023 at 2:21 p.m., Chris Egerton >> > >>> > > <chr...@aiven.io.invalid >> > >>> > > >>> > >> > >>> > > >>> > wrote: >> > >>> > > >>> > >> > >>> > > >>> > > Hi Nitty, >> > >>> > > >>> > > >> > >>> > > >>> > > We've recently added some documentation on implementing >> > >>> > > exactly-once >> > >>> > > >>> > source >> > >>> > > >>> > > connectors here: >> > >>> > > >>> > > >> > >>> > > >>> > >> > >>> > > >>> >> > >>> > > >> > >>> > >> > >>> >> > >> https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors >> > >>> > > >>> > > . >> > >>> > > >>> > > To quote a relevant passage from those docs: >> > >>> > > >>> > > >> > >>> > > >>> > > > In order for a source connector to take advantage of >> this >> > >>> > > support, >> > >>> > > >>> it >> > >>> > > >>> > > must be able to provide meaningful source offsets for >> each >> > >>> record >> > >>> > > >>> that it >> > >>> > > >>> > > emits, and resume consumption from the external system >> at >> > the >> > >>> > exact >> > >>> > > >>> > > position corresponding to any of those offsets without >> > >>> dropping >> > >>> > or >> > >>> > > >>> > > duplicating messages. >> > >>> > > >>> > > >> > >>> > > >>> > > So, as long as your source connector is able to use the >> > Kafka >> > >>> > > Connect >> > >>> > > >>> > > framework's offsets API correctly, it shouldn't be >> > necessary >> > >>> to >> > >>> > > make >> > >>> > > >>> any >> > >>> > > >>> > > other code changes to the connector. >> > >>> > > >>> > > >> > >>> > > >>> > > To enable exactly-once support for source connectors on >> > your >> > >>> > > Connect >> > >>> > > >>> > > cluster, see the docs section here: >> > >>> > > >>> > > >> > >>> > https://kafka.apache.org/documentation/#connect_exactlyoncesource >> > >>> > > >>> > > >> > >>> > > >>> > > With regard to transactions, a transactional producer is >> > >>> always >> > >>> > > >>> created >> > >>> > > >>> > > automatically for your connector by the Connect runtime >> > when >> > >>> > > >>> exactly-once >> > >>> > > >>> > > support is enabled on the worker. The only reason to set >> > >>> > > >>> > > "transaction.boundary" to "connector" is if your >> connector >> > >>> would >> > >>> > > >>> like to >> > >>> > > >>> > > explicitly define its own transaction boundaries. In >> this >> > >>> case, >> > >>> > it >> > >>> > > >>> sounds >> > >>> > > >>> > > like may be what you want; I just want to make sure to >> call >> > >>> out >> > >>> > > that >> > >>> > > >>> in >> > >>> > > >>> > > either case, you should not be directly instantiating a >> > >>> producer >> > >>> > in >> > >>> > > >>> your >> > >>> > > >>> > > connector code, but let the Kafka Connect runtime do >> that >> > for >> > >>> > you, >> > >>> > > >>> and >> > >>> > > >>> > just >> > >>> > > >>> > > worry about returning the right records from >> > SourceTask::poll >> > >>> > (and >> > >>> > > >>> > possibly >> > >>> > > >>> > > defining custom transactions using the >> TransactionContext >> > >>> API). >> > >>> > > >>> > > >> > >>> > > >>> > > With respect to your question about committing or >> aborting >> > in >> > >>> > > certain >> > >>> > > >>> > > circumstances, it'd be useful to know more about your >> use >> > >>> case, >> > >>> > > >>> since it >> > >>> > > >>> > > may not be necessary to define custom transaction >> > boundaries >> > >>> in >> > >>> > > your >> > >>> > > >>> > > connector at all. >> > >>> > > >>> > > >> > >>> > > >>> > > Cheers, >> > >>> > > >>> > > >> > >>> > > >>> > > Chris >> > >>> > > >>> > > >> > >>> > > >>> > > >> > >>> > > >>> > > >> > >>> > > >>> > > On Tue, Mar 7, 2023 at 7:21 AM NITTY BENNY < >> > >>> nittybe...@gmail.com >> > >>> > > >> > >>> > > >>> wrote: >> > >>> > > >>> > > >> > >>> > > >>> > > > Hi Team, >> > >>> > > >>> > > > >> > >>> > > >>> > > > Adding on top of this, I tried creating a >> > >>> TransactionContext >> > >>> > > >>> object and >> > >>> > > >>> > > > calling the commitTransaction and abortTranaction >> methods >> > >>> in >> > >>> > > source >> > >>> > > >>> > > > connectors. >> > >>> > > >>> > > > But the main problem I saw is that if there is any >> error >> > >>> while >> > >>> > > >>> parsing >> > >>> > > >>> > > the >> > >>> > > >>> > > > record, connect is calling an abort but we have a use >> > case >> > >>> to >> > >>> > > call >> > >>> > > >>> > commit >> > >>> > > >>> > > > in some cases. Is it a valid use case in terms of >> kafka >> > >>> > connect? >> > >>> > > >>> > > > >> > >>> > > >>> > > > Another Question - Should I use a transactional >> producer >> > >>> > instead >> > >>> > > >>> > > > creating an object of TransactionContext? Below is the >> > >>> > connector >> > >>> > > >>> > > > configuration I am using. >> > >>> > > >>> > > > >> > >>> > > >>> > > > exactly.once.support: "required" >> > >>> > > >>> > > > transaction.boundary: "connector" >> > >>> > > >>> > > > >> > >>> > > >>> > > > Could you please help me here? >> > >>> > > >>> > > > >> > >>> > > >>> > > > Thanks, >> > >>> > > >>> > > > Nitty >> > >>> > > >>> > > > >> > >>> > > >>> > > > On Tue, Mar 7, 2023 at 12:29 AM NITTY BENNY < >> > >>> > > nittybe...@gmail.com> >> > >>> > > >>> > > wrote: >> > >>> > > >>> > > > >> > >>> > > >>> > > > > Hi Team, >> > >>> > > >>> > > > > I am trying to implement exactly once behavior in >> our >> > >>> source >> > >>> > > >>> > connector. >> > >>> > > >>> > > > Is >> > >>> > > >>> > > > > there any sample source connector implementation >> > >>> available to >> > >>> > > >>> have a >> > >>> > > >>> > > look >> > >>> > > >>> > > > > at? >> > >>> > > >>> > > > > Regards, >> > >>> > > >>> > > > > Nitty >> > >>> > > >>> > > > > >> > >>> > > >>> > > > >> > >>> > > >>> > > >> > >>> > > >>> > >> > >>> > > >>> >> > >>> > > >> >> > >>> > > >> > >>> > >> > >>> >> > >> >> > >> >