Hi Xiaoxia, I am not able to see the attachments you shared with me. I don't understand the problem you are talking about. What do you want me to look?
Thanks, Nitty On Thu, Mar 16, 2023 at 1:54 AM 小侠 <747359...@qq.com.invalid> wrote: > Hi Nitty, > I'm so sorry to forget the signature. > Looking forward to your reply. > > > Thank you, > Xiaoxia > > > > > ------------------ 原始邮件 ------------------ > *发件人:* "users" <nittybe...@gmail.com>; > *发送时间:* 2023年3月15日(星期三) 晚上6:38 > *收件人:* "users"<users@kafka.apache.org>; > *主题:* Re: Exactly once kafka connect query > > Hi Chris, > > We won't be abe to share the source code since it is the properetry Amdocs > code. > > If you have time for a call, I can show you the code and > reproduction scenario over the call. I strongly believe you can find the > issue with that. > > Thanks, > Nitty > > Thanks, > Nitty > > On Tue, Mar 14, 2023 at 3:04 PM Chris Egerton <chr...@aiven.io.invalid> > wrote: > > > Hi Nitty, > > > > Sorry, I should have clarified. The reason I'm thinking about shutdown > here > > is that, when exactly-once support is enabled on a Kafka Connect cluster > > and a new set of task configurations is generated for a connector, the > > Connect framework makes an effort to shut down all the old task instances > > for that connector, and then fences out the transactional producers for > all > > of those instances. I was thinking that this may lead to the producer > > exceptions you are seeing but, after double-checking this assumption, > that > > does not appear to be the case. > > > > Would it be possible to share the source code for your connector and a > > reproduction scenario for what you're seeing? That may be easier than > > coordinating a call. > > > > Cheers, > > > > Chris > > > > On Tue, Mar 14, 2023 at 6:15 AM NITTY BENNY <nittybe...@gmail.com> > wrote: > > > > > Hi Chris, > > > > > > Is there any possibility to have a call with you? This is actually > > blocking > > > our delivery, I actually want to sort with this. > > > > > > Thanks, > > > Nitty > > > > > > On Mon, Mar 13, 2023 at 8:18 PM NITTY BENNY <nittybe...@gmail.com> > > wrote: > > > > > > > Hi Chris, > > > > > > > > I really don't understand why a graceful shutdown will happen during > a > > > > commit operation? Am I understanding something wrong here?. I see > > > > this happens when I have a batch of 2 valid records and in the second > > > > batch the record is invalid. In that case I want to commit the valid > > > > records. So I called commit and sent an empty list for the current > > batch > > > to > > > > poll() and then when the next file comes in and poll sees new > records, > > I > > > > see InvalidProducerEpochException. > > > > Please advise me. > > > > > > > > Thanks, > > > > Nitty > > > > > > > > On Mon, Mar 13, 2023 at 5:33 PM NITTY BENNY <nittybe...@gmail.com> > > > wrote: > > > > > > > >> Hi Chris, > > > >> > > > >> The difference is in the Task Classes, no difference for value/key > > > >> convertors. > > > >> > > > >> I don’t see log messages for graceful shutdown. I am not clear on > what > > > >> you mean by shutting down the task. > > > >> > > > >> I called the commit operation for the successful records. Should I > > > >> perform any other steps if I have an invalid record? > > > >> Please advise. > > > >> > > > >> Thanks, > > > >> Nitty > > > >> > > > >> On Mon, Mar 13, 2023 at 3:42 PM Chris Egerton > <chr...@aiven.io.invalid > > > > > > >> wrote: > > > >> > > > >>> Hi Nitty, > > > >>> > > > >>> Thanks again for all the details here, especially the log messages. > > > >>> > > > >>> > The below mentioned issue is happening for Json connector only. > Is > > > >>> there > > > >>> any difference with asn1,binary,csv and json connector? > > > >>> > > > >>> Can you clarify if the difference here is in the Connector/Task > > > classens, > > > >>> or if it's in the key/value converters that are configured for the > > > >>> connector? The key/value converters are configured using the > > > >>> "key.converter" and "value.converter" property and, if problems > arise > > > >>> with > > > >>> them, the task will fail and, if it has a non-empty ongoing > > > transaction, > > > >>> that transaction will be automatically aborted since we close the > > > task's > > > >>> Kafka producer when it fails (or shuts down gracefully). > > > >>> > > > >>> With regards to these log messages: > > > >>> > > > >>> > org.apache.kafka.common.errors.ProducerFencedException: There is > a > > > >>> newer > > > >>> producer with the same transactionalId which fences the current > one. > > > >>> > > > >>> It looks like your tasks aren't shutting down gracefully in time, > > which > > > >>> causes them to be fenced out by the Connect framework later on. Do > > you > > > >>> see > > > >>> messages like "Graceful stop of task <TASK ID HERE> failed" in the > > logs > > > >>> for > > > >>> your Connect worker? > > > >>> > > > >>> Cheers, > > > >>> > > > >>> Chris > > > >>> > > > >>> On Mon, Mar 13, 2023 at 10:58 AM NITTY BENNY <nittybe...@gmail.com > > > > > >>> wrote: > > > >>> > > > >>> > Hi Chris, > > > >>> > > > > >>> > As you said, the below message is coming when I call an abort if > > > there > > > >>> is > > > >>> > an invalid record, then for the next transaction I can see the > > below > > > >>> > message and then the connector will be stopped. > > > >>> > 2023-03-13 14:28:26,043 INFO [json-sftp-source-connector|task-0] > > > >>> Aborting > > > >>> > transaction for batch as requested by connector > > > >>> > (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask) > > > >>> > [task-thread-json-sftp-source-connector-0] > > > >>> > 2023-03-13 14:28:26,043 INFO [json-sftp-source-connector|task-0] > > > >>> [Producer > > > >>> > clientId=connector-producer-json-sftp-source-connector-0, > > > >>> > transactionalId=connect-cluster-json-sftp-source-connector-0] > > > Aborting > > > >>> > incomplete transaction > > > >>> (org.apache.kafka.clients.producer.KafkaProducer) > > > >>> > [task-thread-json-sftp-source-connector-0] > > > >>> > > > > >>> > The issue with InvalidProducerEpoch is happening when I call the > > > >>> commit if > > > >>> > there is an invalid record, and in the next transaction I am > > getting > > > >>> > InvalidProducerEpoch Exception and the messages are copied in the > > > >>> previous > > > >>> > email. I don't know if this will also be fixed by your bug fix.I > am > > > >>> using > > > >>> > kafka 3.3.1 version as of now. > > > >>> > > > > >>> > Thanks, > > > >>> > Nitty > > > >>> > > > > >>> > > > > >>> > On Mon, Mar 13, 2023 at 10:47 AM NITTY BENNY < > nittybe...@gmail.com > > > > > > >>> wrote: > > > >>> > > > > >>> > > Hi Chris, > > > >>> > > > > > >>> > > The below mentioned issue is happening for Json connector only. > > Is > > > >>> there > > > >>> > > any difference with asn1,binary,csv and json connector? > > > >>> > > > > > >>> > > Thanks, > > > >>> > > Nitty > > > >>> > > > > > >>> > > On Mon, Mar 13, 2023 at 9:16 AM NITTY BENNY < > > nittybe...@gmail.com> > > > >>> > wrote: > > > >>> > > > > > >>> > >> Hi Chris, > > > >>> > >> > > > >>> > >> Sorry Chris, I am not able to reproduce the above issue. > > > >>> > >> > > > >>> > >> I want to share with you one more use case I found. > > > >>> > >> The use case is in the first batch it returns 2 valid records > > and > > > >>> then > > > >>> > in > > > >>> > >> the next batch it is an invalid record.Below is the > > > >>> transaction_state > > > >>> > >> topic, when I call a commit while processing an invalid > record. > > > >>> > >> > > > >>> > >> > > > >>> > > > > >>> > > > > > > connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0, > > > >>> > >> producerId=11, producerEpoch=2, txnTimeoutMs=60000, > > > state=*Ongoing*, > > > >>> > >> pendingState=None, topicPartitions=HashSet(streams-input-2), > > > >>> > >> txnStartTimestamp=1678620463834, > > > >>> txnLastUpdateTimestamp=1678620463834) > > > >>> > >> > > > >>> > >> then after some time I saw the below states as well, > > > >>> > >> > > > >>> > >> > > > >>> > > > > >>> > > > > > > connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0, > > > >>> > >> producerId=11, producerEpoch=3, txnTimeoutMs=60000, > > > >>> > state=*PrepareAbort*, > > > >>> > >> pendingState=None, topicPartitions=HashSet(streams-input-2), > > > >>> > >> txnStartTimestamp=1678620463834, > > > >>> txnLastUpdateTimestamp=1678620526119) > > > >>> > >> > > > >>> > > > > >>> > > > > > > connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0, > > > >>> > >> producerId=11, producerEpoch=3, txnTimeoutMs=60000, > > > >>> > state=*CompleteAbort*, > > > >>> > >> pendingState=None, topicPartitions=HashSet(), > > > >>> > >> txnStartTimestamp=1678620463834, > > > >>> txnLastUpdateTimestamp=1678620526121) > > > >>> > >> > > > >>> > >> Later for the next transaction, when it returns the first > batch > > > >>> below is > > > >>> > >> the logs I can see. > > > >>> > >> > > > >>> > >> Transiting to abortable error state due to > > > >>> > >> org.apache.kafka.common.errors.InvalidProducerEpochException: > > > >>> Producer > > > >>> > >> attempted to produce with an old epoch. > > > >>> > >> > (org.apache.kafka.clients.producer.internals.TransactionManager) > > > >>> > >> [kafka-producer-network-thread | > > > >>> > >> connector-producer-json-sftp-source-connector-0] > > > >>> > >> 2023-03-12 11:32:45,220 ERROR > > [json-sftp-source-connector|task-0] > > > >>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} > > > failed > > > >>> to > > > >>> > send > > > >>> > >> record to streams-input: > > > >>> > >> (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask) > > > >>> > >> [kafka-producer-network-thread | > > > >>> > >> connector-producer-json-sftp-source-connector-0] > > > >>> > >> org.apache.kafka.common.errors.InvalidProducerEpochException: > > > >>> Producer > > > >>> > >> attempted to produce with an old epoch. > > > >>> > >> 2023-03-12 11:32:45,222 INFO > [json-sftp-source-connector|task-0] > > > >>> > >> [Producer > > > clientId=connector-producer-json-sftp-source-connector-0, > > > >>> > >> transactionalId=connect-cluster-json-sftp-source-connector-0] > > > >>> > Transiting to > > > >>> > >> fatal error state due to > > > >>> > >> org.apache.kafka.common.errors.ProducerFencedException: There > > is a > > > >>> newer > > > >>> > >> producer with the same transactionalId which fences the > current > > > one. > > > >>> > >> > (org.apache.kafka.clients.producer.internals.TransactionManager) > > > >>> > >> [kafka-producer-network-thread | > > > >>> > >> connector-producer-json-sftp-source-connector-0] > > > >>> > >> 2023-03-12 11:32:45,222 ERROR > > [json-sftp-source-connector|task-0] > > > >>> > >> [Producer > > > clientId=connector-producer-json-sftp-source-connector-0, > > > >>> > >> transactionalId=connect-cluster-json-sftp-source-connector-0] > > > >>> Aborting > > > >>> > >> producer batches due to fatal error > > > >>> > >> (org.apache.kafka.clients.producer.internals.Sender) > > > >>> > >> [kafka-producer-network-thread | > > > >>> > >> connector-producer-json-sftp-source-connector-0] > > > >>> > >> org.apache.kafka.common.errors.ProducerFencedException: There > > is a > > > >>> newer > > > >>> > >> producer with the same transactionalId which fences the > current > > > one. > > > >>> > >> 2023-03-12 11:32:45,222 ERROR > > [json-sftp-source-connector|task-0] > > > >>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} > > > Failed > > > >>> to > > > >>> > >> flush offsets to storage: > > > >>> > >> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask) > > > >>> > >> [kafka-producer-network-thread | > > > >>> > >> connector-producer-json-sftp-source-connector-0] > > > >>> > >> org.apache.kafka.common.errors.ProducerFencedException: There > > is a > > > >>> newer > > > >>> > >> producer with the same transactionalId which fences the > current > > > one. > > > >>> > >> 2023-03-12 11:32:45,224 ERROR > > [json-sftp-source-connector|task-0] > > > >>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} > > > failed > > > >>> to > > > >>> > send > > > >>> > >> record to streams-input: > > > >>> > >> (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask) > > > >>> > >> [kafka-producer-network-thread | > > > >>> > >> connector-producer-json-sftp-source-connector-0] > > > >>> > >> org.apache.kafka.common.errors.ProducerFencedException: There > > is a > > > >>> newer > > > >>> > >> producer with the same transactionalId which fences the > current > > > one. > > > >>> > >> 2023-03-12 11:32:45,222 ERROR > > > >>> > [json-sftp-source-connector|task-0|offsets] > > > >>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} > > > Failed > > > >>> to > > > >>> > >> commit producer transaction > > > >>> > >> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask) > > > >>> > >> [task-thread-json-sftp-source-connector-0] > > > >>> > >> org.apache.kafka.common.errors.ProducerFencedException: There > > is a > > > >>> newer > > > >>> > >> producer with the same transactionalId which fences the > current > > > one. > > > >>> > >> 2023-03-12 11:32:45,225 ERROR > > [json-sftp-source-connector|task-0] > > > >>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} > > Task > > > >>> threw > > > >>> > an > > > >>> > >> uncaught and unrecoverable exception. Task is being killed and > > > will > > > >>> not > > > >>> > >> recover until manually restarted > > > >>> > >> (org.apache.kafka.connect.runtime.WorkerTask) > > > >>> > >> [task-thread-json-sftp-source-connector-0] > > > >>> > >> > > > >>> > >> Do you know why it is showing an abort state even if I call > > > commit? > > > >>> > >> > > > >>> > >> I tested one more scenario, When I call the commit I saw the > > below > > > >>> > >> > > > >>> > > > > >>> > > > > > > connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0, > > > >>> > >> producerId=11, producerEpoch=2, txnTimeoutMs=60000, > > > state=*Ongoing*, > > > >>> > >> pendingState=None, topicPartitions=HashSet(streams-input-2), > > > >>> > >> txnStartTimestamp=1678620463834, > > > >>> txnLastUpdateTimestamp=1678620463834) > > > >>> > >> Then, before changing the states to Abort, I dropped the next > > file > > > >>> then > > > >>> > I > > > >>> > >> dont see any issues. Previous transaction > > > >>> > >> as well as the current transaction are committed. > > > >>> > >> > > > >>> > >> Thank you for your support. > > > >>> > >> > > > >>> > >> Thanks, > > > >>> > >> Nitty > > > >>> > >> > > > >>> > >> On Fri, Mar 10, 2023 at 8:04 PM Chris Egerton > > > >>> <chr...@aiven.io.invalid> > > > >>> > >> wrote: > > > >>> > >> > > > >>> > >>> Hi Nitty, > > > >>> > >>> > > > >>> > >>> > I called commitTransaction when I reach the first error > > record, > > > >>> but > > > >>> > >>> commit is not happening for me. Kafka connect tries to abort > > the > > > >>> > >>> transaction automatically > > > >>> > >>> > > > >>> > >>> This is really interesting--are you certain that your task > > never > > > >>> > invoked > > > >>> > >>> TransactionContext::abortTransaction in this case? I'm > looking > > > >>> over the > > > >>> > >>> code base and it seems fairly clear that the only thing that > > > could > > > >>> > >>> trigger > > > >>> > >>> a call to KafkaProducer::abortTransaction is a request by the > > > task > > > >>> to > > > >>> > >>> abort > > > >>> > >>> a transaction (either for a next batch, or for a specific > > > record). > > > >>> It > > > >>> > may > > > >>> > >>> help to run the connector in a debugger and/or look for > > "Aborting > > > >>> > >>> transaction for batch as requested by connector" or "Aborting > > > >>> > transaction > > > >>> > >>> for record on topic <TOPIC NAME HERE> as requested by > > connector" > > > >>> log > > > >>> > >>> messages (which will be emitted at INFO level by > > > >>> > >>> the > > org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask > > > >>> class > > > >>> > if > > > >>> > >>> the task is requesting an abort). > > > >>> > >>> > > > >>> > >>> Regardless, I'll work on a fix for the bug with aborting > empty > > > >>> > >>> transactions. Thanks for helping uncover that one! > > > >>> > >>> > > > >>> > >>> Cheers, > > > >>> > >>> > > > >>> > >>> Chris > > > >>> > >>> > > > >>> > >>> On Thu, Mar 9, 2023 at 6:36 PM NITTY BENNY < > > nittybe...@gmail.com > > > > > > > >>> > wrote: > > > >>> > >>> > > > >>> > >>> > Hi Chris, > > > >>> > >>> > > > > >>> > >>> > We have a use case to commit previous successful records > and > > > >>> stop the > > > >>> > >>> > processing of the current file and move on with the next > > file. > > > To > > > >>> > >>> achieve > > > >>> > >>> > that I called commitTransaction when I reach the first > error > > > >>> record, > > > >>> > >>> but > > > >>> > >>> > commit is not happening for me. Kafka connect tries to > abort > > > the > > > >>> > >>> > transaction automatically, I checked the _transaction_state > > > >>> topic and > > > >>> > >>> > states marked as PrepareAbort and CompleteAbort. Do you > know > > > why > > > >>> > kafka > > > >>> > >>> > connect automatically invokes abort instead of the implicit > > > >>> commit I > > > >>> > >>> > called? > > > >>> > >>> > Then as a result, when I tries to parse the next file - say > > > ABC, > > > >>> I > > > >>> > saw > > > >>> > >>> the > > > >>> > >>> > logs "Aborting incomplete transaction" and ERROR: "Failed > to > > > sent > > > >>> > >>> record to > > > >>> > >>> > topic", and we lost the first batch of records from the > > current > > > >>> > >>> transaction > > > >>> > >>> > in the file ABC. > > > >>> > >>> > > > > >>> > >>> > Is it possible that there's a case where an abort is being > > > >>> requested > > > >>> > >>> while > > > >>> > >>> > the current transaction is empty (i.e., the task hasn't > > > returned > > > >>> any > > > >>> > >>> > records from SourceTask::poll since the last transaction > was > > > >>> > >>> > committed/aborted)? --- Yes, that case is possible for us. > > > There > > > >>> is a > > > >>> > >>> case > > > >>> > >>> > where the first record itself an error record. > > > >>> > >>> > > > > >>> > >>> > Thanks, > > > >>> > >>> > Nitty > > > >>> > >>> > > > > >>> > >>> > On Thu, Mar 9, 2023 at 3:48 PM Chris Egerton > > > >>> <chr...@aiven.io.invalid > > > >>> > > > > > >>> > >>> > wrote: > > > >>> > >>> > > > > >>> > >>> > > Hi Nitty, > > > >>> > >>> > > > > > >>> > >>> > > Thanks for the code examples and the detailed > explanations, > > > >>> this is > > > >>> > >>> > really > > > >>> > >>> > > helpful! > > > >>> > >>> > > > > > >>> > >>> > > > Say if I have a file with 5 records and batch size is > 2, > > > and > > > >>> in > > > >>> > my > > > >>> > >>> 3rd > > > >>> > >>> > > batch I have one error record then in that batch, I dont > > > have a > > > >>> > valid > > > >>> > >>> > > record to call commit or abort. But I want to commit all > > the > > > >>> > previous > > > >>> > >>> > > batches that were successfully parsed. How do I do that? > > > >>> > >>> > > > > > >>> > >>> > > An important thing to keep in mind with the > > > TransactionContext > > > >>> API > > > >>> > is > > > >>> > >>> > that > > > >>> > >>> > > all records that a task returns from SourceTask::poll are > > > >>> > implicitly > > > >>> > >>> > > included in a transaction. Invoking > > > >>> > >>> SourceTaskContext::transactionContext > > > >>> > >>> > > doesn't alter this or cause transactions to start being > > used; > > > >>> > >>> everything > > > >>> > >>> > is > > > >>> > >>> > > already in a transaction, and the Connect runtime > > > automatically > > > >>> > >>> begins > > > >>> > >>> > > transactions for any records it sees from the task if it > > > hasn't > > > >>> > >>> already > > > >>> > >>> > > begun one. It's also valid to return a null or empty list > > of > > > >>> > records > > > >>> > >>> from > > > >>> > >>> > > SourceTask::poll. So in this case, you can invoke > > > >>> > >>> > > transactionContext.commitTransaction() (the no-args > > variant) > > > >>> and > > > >>> > >>> return > > > >>> > >>> > an > > > >>> > >>> > > empty batch from SourceTask::poll, which will cause the > > > >>> transaction > > > >>> > >>> > > containing the 4 valid records that were returned in the > > > last 2 > > > >>> > >>> batches > > > >>> > >>> > to > > > >>> > >>> > > be committed. > > > >>> > >>> > > > > > >>> > >>> > > FWIW, I would be a little cautious about this approach. > > Many > > > >>> times > > > >>> > >>> it's > > > >>> > >>> > > better to fail fast on invalid data; it might be worth it > > to > > > at > > > >>> > least > > > >>> > >>> > allow > > > >>> > >>> > > users to configure whether the connector fails on invalid > > > >>> data, or > > > >>> > >>> > silently > > > >>> > >>> > > skips over it (which is what happens when transactions > are > > > >>> > aborted). > > > >>> > >>> > > > > > >>> > >>> > > > Why is abort not working without adding the last record > > to > > > >>> the > > > >>> > >>> list? > > > >>> > >>> > > > > > >>> > >>> > > Is it possible that there's a case where an abort is > being > > > >>> > requested > > > >>> > >>> > while > > > >>> > >>> > > the current transaction is empty (i.e., the task hasn't > > > >>> returned > > > >>> > any > > > >>> > >>> > > records from SourceTask::poll since the last transaction > > was > > > >>> > >>> > > committed/aborted)? I think this may be a bug in the > > Connect > > > >>> > >>> framework > > > >>> > >>> > > where we don't check to see if a transaction is already > > open > > > >>> when a > > > >>> > >>> task > > > >>> > >>> > > requests that a transaction be aborted, which can cause > > tasks > > > >>> to > > > >>> > fail > > > >>> > >>> > (see > > > >>> > >>> > > https://issues.apache.org/jira/browse/KAFKA-14799 for > more > > > >>> > details). > > > >>> > >>> > > > > > >>> > >>> > > Cheers, > > > >>> > >>> > > > > > >>> > >>> > > Chris > > > >>> > >>> > > > > > >>> > >>> > > > > > >>> > >>> > > On Wed, Mar 8, 2023 at 6:44 PM NITTY BENNY < > > > >>> nittybe...@gmail.com> > > > >>> > >>> wrote: > > > >>> > >>> > > > > > >>> > >>> > > > Hi Chris, > > > >>> > >>> > > > > > > >>> > >>> > > > I am not sure if you are able to see the images I > shared > > > with > > > >>> > you . > > > >>> > >>> > > > Copying the code snippet below, > > > >>> > >>> > > > > > > >>> > >>> > > > if (expectedRecordCount >= 0) { > > > >>> > >>> > > > int missingCount = expectedRecordCount - > > (int) > > > >>> this. > > > >>> > >>> > > > recordOffset() - 1; > > > >>> > >>> > > > if (missingCount > 0) { > > > >>> > >>> > > > if (transactionContext != null) { > > > >>> > >>> > > > isMissedRecords = true; > > > >>> > >>> > > > } else { > > > >>> > >>> > > > throw new > > > >>> DataException(String.format("Missing %d > > > >>> > >>> > records > > > >>> > >>> > > > (expecting %d, actual %d)", missingCount, > > > >>> expectedRecordCount, > > > >>> > >>> this. > > > >>> > >>> > > > recordOffset())); > > > >>> > >>> > > > } > > > >>> > >>> > > > } else if (missingCount < 0) { > > > >>> > >>> > > > if (transactionContext != null) { > > > >>> > >>> > > > isMissedRecords = true; > > > >>> > >>> > > > } else { > > > >>> > >>> > > > throw new > > DataException(String.format("Too > > > >>> many > > > >>> > >>> records > > > >>> > >>> > > > (expecting %d, actual %d)", expectedRecordCount, > > > >>> > >>> this.recordOffset())); > > > >>> > >>> > > > } > > > >>> > >>> > > > } > > > >>> > >>> > > > } > > > >>> > >>> > > > addLastRecord(records, null, value); > > > >>> > >>> > > > } > > > >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > > > > >>> > >>> > > > //asn1 or binary abort > > > >>> > >>> > > > if((config.parseErrorThreshold != null && > > > >>> parseErrorCount > > > >>> > >>> >= > > > >>> > >>> > > > config.parseErrorThreshold > > > >>> > >>> > > > && lastbatch && transactionContext != null) || > > > >>> > >>> (isMissedRecords > > > >>> > >>> > > > && transactionContext != null && lastbatch)) { > > > >>> > >>> > > > log.info("Transaction is aborting"); > > > >>> > >>> > > > log.info("records = {}", records); > > > >>> > >>> > > > if (!records.isEmpty()) { > > > >>> > >>> > > > log.info("with record"); > > > >>> > >>> > > > > > > >>> > >>> > > > > transactionContext.abortTransaction(records.get(records.size > > > >>> > >>> > > > ()-1)); > > > >>> > >>> > > > } else { > > > >>> > >>> > > > log.info("without record"); > > > >>> > >>> > > > transactionContext.abortTransaction(); > > > >>> > >>> > > > } > > > >>> > >>> > > > > > > >>> > >>> > > > Thanks, > > > >>> > >>> > > > Nitty > > > >>> > >>> > > > > > > >>> > >>> > > > On Wed, Mar 8, 2023 at 11:38 PM NITTY BENNY < > > > >>> > nittybe...@gmail.com> > > > >>> > >>> > > wrote: > > > >>> > >>> > > > > > > >>> > >>> > > >> Hi Chris, > > > >>> > >>> > > >> Sorry for the typo in my previous email. > > > >>> > >>> > > >> > > > >>> > >>> > > >> Regarding the point 2,* the task returns a batch of > > > records > > > >>> from > > > >>> > >>> > > >> SourceTask::poll (and, if using* > > > >>> > >>> > > >> > > > >>> > >>> > > >> > > > >>> > >>> > > >> *the per-record API provided by the TransactionContext > > > >>> class, > > > >>> > >>> includes > > > >>> > >>> > > >> atleast one record that should trigger a transaction > > > >>> > commit/abort > > > >>> > >>> in > > > >>> > >>> > > >> thatbatch)* > > > >>> > >>> > > >> What if I am using the API without passing a record? > We > > > >>> have 2 > > > >>> > >>> types > > > >>> > >>> > of > > > >>> > >>> > > >> use cases, one where on encountering an error record, > we > > > >>> want to > > > >>> > >>> > commit > > > >>> > >>> > > >> previous successful batches and disregard the failed > > > record > > > >>> and > > > >>> > >>> > upcoming > > > >>> > >>> > > >> batches. In this case we created the > transactionContext > > > just > > > >>> > >>> before > > > >>> > >>> > > reading > > > >>> > >>> > > >> the file (file is our transaction boundary).Say if I > > have > > > a > > > >>> file > > > >>> > >>> with > > > >>> > >>> > 5 > > > >>> > >>> > > >> records and batch size is 2, and in my 3rd batch I > have > > > one > > > >>> > error > > > >>> > >>> > record > > > >>> > >>> > > >> then in that batch, I dont have a valid record to call > > > >>> commit or > > > >>> > >>> > abort. > > > >>> > >>> > > But > > > >>> > >>> > > >> I want to commit all the previous batches that were > > > >>> successfully > > > >>> > >>> > parsed. > > > >>> > >>> > > >> How do I do that? > > > >>> > >>> > > >> > > > >>> > >>> > > >> Second use case is where I want to abort a transaction > > if > > > >>> the > > > >>> > >>> record > > > >>> > >>> > > >> count doesn't match. > > > >>> > >>> > > >> Code Snippet : > > > >>> > >>> > > >> [image: image.png] > > > >>> > >>> > > >> There are no error records in this case. If you see I > > > added > > > >>> the > > > >>> > >>> > > condition > > > >>> > >>> > > >> of transactionContext check to implement exactly once, > > > >>> without > > > >>> > >>> > > >> transaction it was just throwing the exception without > > > >>> calling > > > >>> > the > > > >>> > >>> > > >> addLastRecord() method and in the catch block it just > > logs > > > >>> the > > > >>> > >>> message > > > >>> > >>> > > and > > > >>> > >>> > > >> return the list of records without the last record to > > > >>> poll().To > > > >>> > >>> make > > > >>> > >>> > it > > > >>> > >>> > > >> work, I called the method addLastRecord() in this > case, > > so > > > >>> it is > > > >>> > >>> not > > > >>> > >>> > > >> throwing the exception and list has last record as > well. > > > >>> Then I > > > >>> > >>> called > > > >>> > >>> > > the > > > >>> > >>> > > >> abort, everything got aborted. Why is abort not > working > > > >>> without > > > >>> > >>> adding > > > >>> > >>> > > the > > > >>> > >>> > > >> last record to the list? > > > >>> > >>> > > >> [image: image.png] > > > >>> > >>> > > >> > > > >>> > >>> > > >> Code to call abort. > > > >>> > >>> > > >> > > > >>> > >>> > > >> > > > >>> > >>> > > >> > > > >>> > >>> > > >> > > > >>> > >>> > > >> Thanks, > > > >>> > >>> > > >> Nitty > > > >>> > >>> > > >> > > > >>> > >>> > > >> On Wed, Mar 8, 2023 at 4:26 PM Chris Egerton > > > >>> > >>> <chr...@aiven.io.invalid > > > >>> > >>> > > > > > >>> > >>> > > >> wrote: > > > >>> > >>> > > >> > > > >>> > >>> > > >>> Hi Nitty, > > > >>> > >>> > > >>> > > > >>> > >>> > > >>> I'm a little confused about what you mean by this > part: > > > >>> > >>> > > >>> > > > >>> > >>> > > >>> > transaction is not getting completed because it is > > not > > > >>> > >>> commiting > > > >>> > >>> > the > > > >>> > >>> > > >>> transaction offest. > > > >>> > >>> > > >>> > > > >>> > >>> > > >>> The only conditions required for a transaction to be > > > >>> completed > > > >>> > >>> when a > > > >>> > >>> > > >>> connector is defining its own transaction boundaries > > are: > > > >>> > >>> > > >>> > > > >>> > >>> > > >>> 1. The task requests a transaction commit/abort from > > the > > > >>> > >>> > > >>> TransactionContext > > > >>> > >>> > > >>> 2. The task returns a batch of records from > > > >>> SourceTask::poll > > > >>> > >>> (and, if > > > >>> > >>> > > >>> using > > > >>> > >>> > > >>> the per-record API provided by the TransactionContext > > > >>> class, > > > >>> > >>> includes > > > >>> > >>> > > at > > > >>> > >>> > > >>> least one record that should trigger a transaction > > > >>> commit/abort > > > >>> > >>> in > > > >>> > >>> > that > > > >>> > >>> > > >>> batch) > > > >>> > >>> > > >>> > > > >>> > >>> > > >>> The Connect runtime should automatically commit > source > > > >>> offsets > > > >>> > to > > > >>> > >>> > Kafka > > > >>> > >>> > > >>> whenever a transaction is completed, either by commit > > or > > > >>> abort. > > > >>> > >>> This > > > >>> > >>> > is > > > >>> > >>> > > >>> because transactions should only be aborted for data > > that > > > >>> > should > > > >>> > >>> > never > > > >>> > >>> > > be > > > >>> > >>> > > >>> re-read by the connector; if there is a validation > > error > > > >>> that > > > >>> > >>> should > > > >>> > >>> > be > > > >>> > >>> > > >>> handled by reconfiguring the connector, then the task > > > >>> should > > > >>> > >>> throw an > > > >>> > >>> > > >>> exception instead of aborting the transaction. > > > >>> > >>> > > >>> > > > >>> > >>> > > >>> If possible, do you think you could provide a brief > > code > > > >>> > snippet > > > >>> > >>> > > >>> illustrating what your task is doing that's causing > > > issues? > > > >>> > >>> > > >>> > > > >>> > >>> > > >>> Cheers, > > > >>> > >>> > > >>> > > > >>> > >>> > > >>> Chris (not Chrise 🙂) > > > >>> > >>> > > >>> > > > >>> > >>> > > >>> On Tue, Mar 7, 2023 at 10:17 AM NITTY BENNY < > > > >>> > >>> nittybe...@gmail.com> > > > >>> > >>> > > >>> wrote: > > > >>> > >>> > > >>> > > > >>> > >>> > > >>> > Hi Chrise, > > > >>> > >>> > > >>> > > > > >>> > >>> > > >>> > Thanks for sharing the details. > > > >>> > >>> > > >>> > > > > >>> > >>> > > >>> > Regarding the use case, For Asn1 source connector > we > > > >>> have a > > > >>> > use > > > >>> > >>> > case > > > >>> > >>> > > to > > > >>> > >>> > > >>> > validate number of records in the file with the > > number > > > of > > > >>> > >>> records > > > >>> > >>> > in > > > >>> > >>> > > >>> the > > > >>> > >>> > > >>> > header. So currently, if validation fails we are > not > > > >>> sending > > > >>> > >>> the > > > >>> > >>> > last > > > >>> > >>> > > >>> > record to the topic. But after introducing exactly > > once > > > >>> with > > > >>> > >>> > > connector > > > >>> > >>> > > >>> > transaction boundary, I can see that if I call an > > abort > > > >>> when > > > >>> > >>> the > > > >>> > >>> > > >>> validation > > > >>> > >>> > > >>> > fails, transaction is not getting completed because > > it > > > >>> is not > > > >>> > >>> > > >>> commiting the > > > >>> > >>> > > >>> > transaction offest. I saw that transaction state > > > changed > > > >>> to > > > >>> > >>> > > >>> CompleteAbort. > > > >>> > >>> > > >>> > So for my next transaction I am getting > > > >>> > >>> > InvalidProducerEpochException > > > >>> > >>> > > >>> and > > > >>> > >>> > > >>> > then task stopped after that. I tried calling the > > abort > > > >>> after > > > >>> > >>> > sending > > > >>> > >>> > > >>> last > > > >>> > >>> > > >>> > record to the topic then transaction getting > > completed. > > > >>> > >>> > > >>> > > > > >>> > >>> > > >>> > I dont know if I am doing anything wrong here. > > > >>> > >>> > > >>> > > > > >>> > >>> > > >>> > Please advise. > > > >>> > >>> > > >>> > Thanks, > > > >>> > >>> > > >>> > Nitty > > > >>> > >>> > > >>> > > > > >>> > >>> > > >>> > On Tue 7 Mar 2023 at 2:21 p.m., Chris Egerton > > > >>> > >>> > > <chr...@aiven.io.invalid > > > >>> > >>> > > >>> > > > > >>> > >>> > > >>> > wrote: > > > >>> > >>> > > >>> > > > > >>> > >>> > > >>> > > Hi Nitty, > > > >>> > >>> > > >>> > > > > > >>> > >>> > > >>> > > We've recently added some documentation on > > > implementing > > > >>> > >>> > > exactly-once > > > >>> > >>> > > >>> > source > > > >>> > >>> > > >>> > > connectors here: > > > >>> > >>> > > >>> > > > > > >>> > >>> > > >>> > > > > >>> > >>> > > >>> > > > >>> > >>> > > > > > >>> > >>> > > > > >>> > >>> > > > >>> > > > > >>> > > > > > > https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors > > > >>> > >>> > > >>> > > . > > > >>> > >>> > > >>> > > To quote a relevant passage from those docs: > > > >>> > >>> > > >>> > > > > > >>> > >>> > > >>> > > > In order for a source connector to take > advantage > > > of > > > >>> this > > > >>> > >>> > > support, > > > >>> > >>> > > >>> it > > > >>> > >>> > > >>> > > must be able to provide meaningful source offsets > > for > > > >>> each > > > >>> > >>> record > > > >>> > >>> > > >>> that it > > > >>> > >>> > > >>> > > emits, and resume consumption from the external > > > system > > > >>> at > > > >>> > the > > > >>> > >>> > exact > > > >>> > >>> > > >>> > > position corresponding to any of those offsets > > > without > > > >>> > >>> dropping > > > >>> > >>> > or > > > >>> > >>> > > >>> > > duplicating messages. > > > >>> > >>> > > >>> > > > > > >>> > >>> > > >>> > > So, as long as your source connector is able to > use > > > the > > > >>> > Kafka > > > >>> > >>> > > Connect > > > >>> > >>> > > >>> > > framework's offsets API correctly, it shouldn't > be > > > >>> > necessary > > > >>> > >>> to > > > >>> > >>> > > make > > > >>> > >>> > > >>> any > > > >>> > >>> > > >>> > > other code changes to the connector. > > > >>> > >>> > > >>> > > > > > >>> > >>> > > >>> > > To enable exactly-once support for source > > connectors > > > on > > > >>> > your > > > >>> > >>> > > Connect > > > >>> > >>> > > >>> > > cluster, see the docs section here: > > > >>> > >>> > > >>> > > > > > >>> > >>> > > > > >>> https://kafka.apache.org/documentation/#connect_exactlyoncesource > > > >>> > >>> > > >>> > > > > > >>> > >>> > > >>> > > With regard to transactions, a transactional > > producer > > > >>> is > > > >>> > >>> always > > > >>> > >>> > > >>> created > > > >>> > >>> > > >>> > > automatically for your connector by the Connect > > > runtime > > > >>> > when > > > >>> > >>> > > >>> exactly-once > > > >>> > >>> > > >>> > > support is enabled on the worker. The only reason > > to > > > >>> set > > > >>> > >>> > > >>> > > "transaction.boundary" to "connector" is if your > > > >>> connector > > > >>> > >>> would > > > >>> > >>> > > >>> like to > > > >>> > >>> > > >>> > > explicitly define its own transaction boundaries. > > In > > > >>> this > > > >>> > >>> case, > > > >>> > >>> > it > > > >>> > >>> > > >>> sounds > > > >>> > >>> > > >>> > > like may be what you want; I just want to make > sure > > > to > > > >>> call > > > >>> > >>> out > > > >>> > >>> > > that > > > >>> > >>> > > >>> in > > > >>> > >>> > > >>> > > either case, you should not be directly > > > instantiating a > > > >>> > >>> producer > > > >>> > >>> > in > > > >>> > >>> > > >>> your > > > >>> > >>> > > >>> > > connector code, but let the Kafka Connect runtime > > do > > > >>> that > > > >>> > for > > > >>> > >>> > you, > > > >>> > >>> > > >>> and > > > >>> > >>> > > >>> > just > > > >>> > >>> > > >>> > > worry about returning the right records from > > > >>> > SourceTask::poll > > > >>> > >>> > (and > > > >>> > >>> > > >>> > possibly > > > >>> > >>> > > >>> > > defining custom transactions using the > > > >>> TransactionContext > > > >>> > >>> API). > > > >>> > >>> > > >>> > > > > > >>> > >>> > > >>> > > With respect to your question about committing or > > > >>> aborting > > > >>> > in > > > >>> > >>> > > certain > > > >>> > >>> > > >>> > > circumstances, it'd be useful to know more about > > your > > > >>> use > > > >>> > >>> case, > > > >>> > >>> > > >>> since it > > > >>> > >>> > > >>> > > may not be necessary to define custom transaction > > > >>> > boundaries > > > >>> > >>> in > > > >>> > >>> > > your > > > >>> > >>> > > >>> > > connector at all. > > > >>> > >>> > > >>> > > > > > >>> > >>> > > >>> > > Cheers, > > > >>> > >>> > > >>> > > > > > >>> > >>> > > >>> > > Chris > > > >>> > >>> > > >>> > > > > > >>> > >>> > > >>> > > > > > >>> > >>> > > >>> > > > > > >>> > >>> > > >>> > > On Tue, Mar 7, 2023 at 7:21 AM NITTY BENNY < > > > >>> > >>> nittybe...@gmail.com > > > >>> > >>> > > > > > >>> > >>> > > >>> wrote: > > > >>> > >>> > > >>> > > > > > >>> > >>> > > >>> > > > Hi Team, > > > >>> > >>> > > >>> > > > > > > >>> > >>> > > >>> > > > Adding on top of this, I tried creating a > > > >>> > >>> TransactionContext > > > >>> > >>> > > >>> object and > > > >>> > >>> > > >>> > > > calling the commitTransaction and > abortTranaction > > > >>> methods > > > >>> > >>> in > > > >>> > >>> > > source > > > >>> > >>> > > >>> > > > connectors. > > > >>> > >>> > > >>> > > > But the main problem I saw is that if there is > > any > > > >>> error > > > >>> > >>> while > > > >>> > >>> > > >>> parsing > > > >>> > >>> > > >>> > > the > > > >>> > >>> > > >>> > > > record, connect is calling an abort but we > have a > > > use > > > >>> > case > > > >>> > >>> to > > > >>> > >>> > > call > > > >>> > >>> > > >>> > commit > > > >>> > >>> > > >>> > > > in some cases. Is it a valid use case in terms > of > > > >>> kafka > > > >>> > >>> > connect? > > > >>> > >>> > > >>> > > > > > > >>> > >>> > > >>> > > > Another Question - Should I use a transactional > > > >>> producer > > > >>> > >>> > instead > > > >>> > >>> > > >>> > > > creating an object of TransactionContext? Below > > is > > > >>> the > > > >>> > >>> > connector > > > >>> > >>> > > >>> > > > configuration I am using. > > > >>> > >>> > > >>> > > > > > > >>> > >>> > > >>> > > > exactly.once.support: "required" > > > >>> > >>> > > >>> > > > transaction.boundary: "connector" > > > >>> > >>> > > >>> > > > > > > >>> > >>> > > >>> > > > Could you please help me here? > > > >>> > >>> > > >>> > > > > > > >>> > >>> > > >>> > > > Thanks, > > > >>> > >>> > > >>> > > > Nitty > > > >>> > >>> > > >>> > > > > > > >>> > >>> > > >>> > > > On Tue, Mar 7, 2023 at 12:29 AM NITTY BENNY < > > > >>> > >>> > > nittybe...@gmail.com> > > > >>> > >>> > > >>> > > wrote: > > > >>> > >>> > > >>> > > > > > > >>> > >>> > > >>> > > > > Hi Team, > > > >>> > >>> > > >>> > > > > I am trying to implement exactly once > behavior > > in > > > >>> our > > > >>> > >>> source > > > >>> > >>> > > >>> > connector. > > > >>> > >>> > > >>> > > > Is > > > >>> > >>> > > >>> > > > > there any sample source connector > > implementation > > > >>> > >>> available to > > > >>> > >>> > > >>> have a > > > >>> > >>> > > >>> > > look > > > >>> > >>> > > >>> > > > > at? > > > >>> > >>> > > >>> > > > > Regards, > > > >>> > >>> > > >>> > > > > Nitty > > > >>> > >>> > > >>> > > > > > > > >>> > >>> > > >>> > > > > > > >>> > >>> > > >>> > > > > > >>> > >>> > > >>> > > > > >>> > >>> > > >>> > > > >>> > >>> > > >> > > > >>> > >>> > > > > > >>> > >>> > > > > >>> > >>> > > > >>> > >> > > > >>> > > > > >>> > > > >> > > > > > > >