Hi Xiaoxia,

I am not able to see the attachments you shared with me. I don't understand
the problem you are talking about.
What do you want me to look?

Thanks,
Nitty

On Thu, Mar 16, 2023 at 1:54 AM 小侠 <747359...@qq.com.invalid> wrote:

> Hi Nitty,
>     I'm so sorry to forget the signature.
>     Looking forward to your reply.
>
>
>                 Thank you,
>                 Xiaoxia
>
>
>
>
> ------------------ 原始邮件 ------------------
> *发件人:* "users" <nittybe...@gmail.com>;
> *发送时间:* 2023年3月15日(星期三) 晚上6:38
> *收件人:* "users"<users@kafka.apache.org>;
> *主题:* Re: Exactly once kafka connect query
>
> Hi Chris,
>
> We won't be abe to share the source code since it is the properetry Amdocs
> code.
>
> If you have time for a call, I can show you the code and
> reproduction scenario over the call. I strongly believe you can find the
> issue with that.
>
> Thanks,
> Nitty
>
> Thanks,
> Nitty
>
> On Tue, Mar 14, 2023 at 3:04 PM Chris Egerton <chr...@aiven.io.invalid>
> wrote:
>
> > Hi Nitty,
> >
> > Sorry, I should have clarified. The reason I'm thinking about shutdown
> here
> > is that, when exactly-once support is enabled on a Kafka Connect cluster
> > and a new set of task configurations is generated for a connector, the
> > Connect framework makes an effort to shut down all the old task instances
> > for that connector, and then fences out the transactional producers for
> all
> > of those instances. I was thinking that this may lead to the producer
> > exceptions you are seeing but, after double-checking this assumption,
> that
> > does not appear to be the case.
> >
> > Would it be possible to share the source code for your connector and a
> > reproduction scenario for what you're seeing? That may be easier than
> > coordinating a call.
> >
> > Cheers,
> >
> > Chris
> >
> > On Tue, Mar 14, 2023 at 6:15 AM NITTY BENNY <nittybe...@gmail.com>
> wrote:
> >
> > > Hi Chris,
> > >
> > > Is there any possibility to have a call with you? This is actually
> > blocking
> > > our delivery, I actually want to sort with this.
> > >
> > > Thanks,
> > > Nitty
> > >
> > > On Mon, Mar 13, 2023 at 8:18 PM NITTY BENNY <nittybe...@gmail.com>
> > wrote:
> > >
> > > > Hi Chris,
> > > >
> > > > I really don't understand why a graceful shutdown will happen during
> a
> > > > commit operation? Am I understanding something wrong here?. I see
> > > > this happens when I have a batch of 2 valid records and in the second
> > > > batch the record is invalid. In that case I want to commit the valid
> > > > records. So I called commit and sent an empty list for the current
> > batch
> > > to
> > > > poll() and then when the next file comes in and poll sees new
> records,
> > I
> > > > see InvalidProducerEpochException.
> > > > Please advise me.
> > > >
> > > > Thanks,
> > > > Nitty
> > > >
> > > > On Mon, Mar 13, 2023 at 5:33 PM NITTY BENNY <nittybe...@gmail.com>
> > > wrote:
> > > >
> > > >> Hi Chris,
> > > >>
> > > >> The difference is in the Task Classes, no difference for value/key
> > > >> convertors.
> > > >>
> > > >> I don’t see log messages for graceful shutdown. I am not clear on
> what
> > > >> you mean by shutting down the task.
> > > >>
> > > >> I called the commit operation for the successful records. Should I
> > > >> perform any other steps if I have an invalid record?
> > > >> Please advise.
> > > >>
> > > >> Thanks,
> > > >> Nitty
> > > >>
> > > >> On Mon, Mar 13, 2023 at 3:42 PM Chris Egerton
> <chr...@aiven.io.invalid
> > >
> > > >> wrote:
> > > >>
> > > >>> Hi Nitty,
> > > >>>
> > > >>> Thanks again for all the details here, especially the log messages.
> > > >>>
> > > >>> > The below mentioned issue is happening for Json connector only.
> Is
> > > >>> there
> > > >>> any difference with asn1,binary,csv and json connector?
> > > >>>
> > > >>> Can you clarify if the difference here is in the Connector/Task
> > > classens,
> > > >>> or if it's in the key/value converters that are configured for the
> > > >>> connector? The key/value converters are configured using the
> > > >>> "key.converter" and "value.converter" property and, if problems
> arise
> > > >>> with
> > > >>> them, the task will fail and, if it has a non-empty ongoing
> > > transaction,
> > > >>> that transaction will be automatically aborted since we close the
> > > task's
> > > >>> Kafka producer when it fails (or shuts down gracefully).
> > > >>>
> > > >>> With regards to these log messages:
> > > >>>
> > > >>> > org.apache.kafka.common.errors.ProducerFencedException: There is
> a
> > > >>> newer
> > > >>> producer with the same transactionalId which fences the current
> one.
> > > >>>
> > > >>> It looks like your tasks aren't shutting down gracefully in time,
> > which
> > > >>> causes them to be fenced out by the Connect framework later on. Do
> > you
> > > >>> see
> > > >>> messages like "Graceful stop of task <TASK ID HERE> failed" in the
> > logs
> > > >>> for
> > > >>> your Connect worker?
> > > >>>
> > > >>> Cheers,
> > > >>>
> > > >>> Chris
> > > >>>
> > > >>> On Mon, Mar 13, 2023 at 10:58 AM NITTY BENNY <nittybe...@gmail.com
> >
> > > >>> wrote:
> > > >>>
> > > >>> > Hi Chris,
> > > >>> >
> > > >>> > As you said, the below message is coming when I call an abort if
> > > there
> > > >>> is
> > > >>> > an invalid record, then for the next transaction I can see the
> > below
> > > >>> > message and then the connector will be stopped.
> > > >>> > 2023-03-13 14:28:26,043 INFO [json-sftp-source-connector|task-0]
> > > >>> Aborting
> > > >>> > transaction for batch as requested by connector
> > > >>> > (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask)
> > > >>> > [task-thread-json-sftp-source-connector-0]
> > > >>> > 2023-03-13 14:28:26,043 INFO [json-sftp-source-connector|task-0]
> > > >>> [Producer
> > > >>> > clientId=connector-producer-json-sftp-source-connector-0,
> > > >>> > transactionalId=connect-cluster-json-sftp-source-connector-0]
> > > Aborting
> > > >>> > incomplete transaction
> > > >>> (org.apache.kafka.clients.producer.KafkaProducer)
> > > >>> > [task-thread-json-sftp-source-connector-0]
> > > >>> >
> > > >>> > The issue with InvalidProducerEpoch is happening when I call the
> > > >>> commit if
> > > >>> > there is an invalid record, and in the next transaction I am
> > getting
> > > >>> > InvalidProducerEpoch Exception and the messages are copied in the
> > > >>> previous
> > > >>> > email. I don't know if this will also be fixed by your bug fix.I
> am
> > > >>> using
> > > >>> > kafka 3.3.1 version as of now.
> > > >>> >
> > > >>> > Thanks,
> > > >>> > Nitty
> > > >>> >
> > > >>> >
> > > >>> > On Mon, Mar 13, 2023 at 10:47 AM NITTY BENNY <
> nittybe...@gmail.com
> > >
> > > >>> wrote:
> > > >>> >
> > > >>> > > Hi Chris,
> > > >>> > >
> > > >>> > > The below mentioned issue is happening for Json connector only.
> > Is
> > > >>> there
> > > >>> > > any difference with asn1,binary,csv and json connector?
> > > >>> > >
> > > >>> > > Thanks,
> > > >>> > > Nitty
> > > >>> > >
> > > >>> > > On Mon, Mar 13, 2023 at 9:16 AM NITTY BENNY <
> > nittybe...@gmail.com>
> > > >>> > wrote:
> > > >>> > >
> > > >>> > >> Hi Chris,
> > > >>> > >>
> > > >>> > >> Sorry Chris, I am not able to reproduce the above issue.
> > > >>> > >>
> > > >>> > >> I want to share with you one more use case I found.
> > > >>> > >> The use case is in the first batch it returns 2 valid records
> > and
> > > >>> then
> > > >>> > in
> > > >>> > >> the next batch it is an invalid record.Below is the
> > > >>> transaction_state
> > > >>> > >> topic, when I call a commit while processing an invalid
> record.
> > > >>> > >>
> > > >>> > >>
> > > >>> >
> > > >>>
> > >
> >
> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
> > > >>> > >> producerId=11, producerEpoch=2, txnTimeoutMs=60000,
> > > state=*Ongoing*,
> > > >>> > >> pendingState=None, topicPartitions=HashSet(streams-input-2),
> > > >>> > >> txnStartTimestamp=1678620463834,
> > > >>> txnLastUpdateTimestamp=1678620463834)
> > > >>> > >>
> > > >>> > >> then after some time I saw the below states as well,
> > > >>> > >>
> > > >>> > >>
> > > >>> >
> > > >>>
> > >
> >
> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
> > > >>> > >> producerId=11, producerEpoch=3, txnTimeoutMs=60000,
> > > >>> > state=*PrepareAbort*,
> > > >>> > >> pendingState=None, topicPartitions=HashSet(streams-input-2),
> > > >>> > >> txnStartTimestamp=1678620463834,
> > > >>> txnLastUpdateTimestamp=1678620526119)
> > > >>> > >>
> > > >>> >
> > > >>>
> > >
> >
> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
> > > >>> > >> producerId=11, producerEpoch=3, txnTimeoutMs=60000,
> > > >>> > state=*CompleteAbort*,
> > > >>> > >> pendingState=None, topicPartitions=HashSet(),
> > > >>> > >> txnStartTimestamp=1678620463834,
> > > >>> txnLastUpdateTimestamp=1678620526121)
> > > >>> > >>
> > > >>> > >> Later for the next transaction, when it returns the first
> batch
> > > >>> below is
> > > >>> > >> the logs I can see.
> > > >>> > >>
> > > >>> > >>  Transiting to abortable error state due to
> > > >>> > >> org.apache.kafka.common.errors.InvalidProducerEpochException:
> > > >>> Producer
> > > >>> > >> attempted to produce with an old epoch.
> > > >>> > >>
> (org.apache.kafka.clients.producer.internals.TransactionManager)
> > > >>> > >> [kafka-producer-network-thread |
> > > >>> > >> connector-producer-json-sftp-source-connector-0]
> > > >>> > >> 2023-03-12 11:32:45,220 ERROR
> > [json-sftp-source-connector|task-0]
> > > >>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0}
> > > failed
> > > >>> to
> > > >>> > send
> > > >>> > >> record to streams-input:
> > > >>> > >> (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask)
> > > >>> > >> [kafka-producer-network-thread |
> > > >>> > >> connector-producer-json-sftp-source-connector-0]
> > > >>> > >> org.apache.kafka.common.errors.InvalidProducerEpochException:
> > > >>> Producer
> > > >>> > >> attempted to produce with an old epoch.
> > > >>> > >> 2023-03-12 11:32:45,222 INFO
> [json-sftp-source-connector|task-0]
> > > >>> > >> [Producer
> > > clientId=connector-producer-json-sftp-source-connector-0,
> > > >>> > >> transactionalId=connect-cluster-json-sftp-source-connector-0]
> > > >>> > Transiting to
> > > >>> > >> fatal error state due to
> > > >>> > >> org.apache.kafka.common.errors.ProducerFencedException: There
> > is a
> > > >>> newer
> > > >>> > >> producer with the same transactionalId which fences the
> current
> > > one.
> > > >>> > >>
> (org.apache.kafka.clients.producer.internals.TransactionManager)
> > > >>> > >> [kafka-producer-network-thread |
> > > >>> > >> connector-producer-json-sftp-source-connector-0]
> > > >>> > >> 2023-03-12 11:32:45,222 ERROR
> > [json-sftp-source-connector|task-0]
> > > >>> > >> [Producer
> > > clientId=connector-producer-json-sftp-source-connector-0,
> > > >>> > >> transactionalId=connect-cluster-json-sftp-source-connector-0]
> > > >>> Aborting
> > > >>> > >> producer batches due to fatal error
> > > >>> > >> (org.apache.kafka.clients.producer.internals.Sender)
> > > >>> > >> [kafka-producer-network-thread |
> > > >>> > >> connector-producer-json-sftp-source-connector-0]
> > > >>> > >> org.apache.kafka.common.errors.ProducerFencedException: There
> > is a
> > > >>> newer
> > > >>> > >> producer with the same transactionalId which fences the
> current
> > > one.
> > > >>> > >> 2023-03-12 11:32:45,222 ERROR
> > [json-sftp-source-connector|task-0]
> > > >>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0}
> > > Failed
> > > >>> to
> > > >>> > >> flush offsets to storage:
> > > >>> > >> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask)
> > > >>> > >> [kafka-producer-network-thread |
> > > >>> > >> connector-producer-json-sftp-source-connector-0]
> > > >>> > >> org.apache.kafka.common.errors.ProducerFencedException: There
> > is a
> > > >>> newer
> > > >>> > >> producer with the same transactionalId which fences the
> current
> > > one.
> > > >>> > >> 2023-03-12 11:32:45,224 ERROR
> > [json-sftp-source-connector|task-0]
> > > >>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0}
> > > failed
> > > >>> to
> > > >>> > send
> > > >>> > >> record to streams-input:
> > > >>> > >> (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask)
> > > >>> > >> [kafka-producer-network-thread |
> > > >>> > >> connector-producer-json-sftp-source-connector-0]
> > > >>> > >> org.apache.kafka.common.errors.ProducerFencedException: There
> > is a
> > > >>> newer
> > > >>> > >> producer with the same transactionalId which fences the
> current
> > > one.
> > > >>> > >> 2023-03-12 11:32:45,222 ERROR
> > > >>> > [json-sftp-source-connector|task-0|offsets]
> > > >>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0}
> > > Failed
> > > >>> to
> > > >>> > >> commit producer transaction
> > > >>> > >> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask)
> > > >>> > >> [task-thread-json-sftp-source-connector-0]
> > > >>> > >> org.apache.kafka.common.errors.ProducerFencedException: There
> > is a
> > > >>> newer
> > > >>> > >> producer with the same transactionalId which fences the
> current
> > > one.
> > > >>> > >> 2023-03-12 11:32:45,225 ERROR
> > [json-sftp-source-connector|task-0]
> > > >>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0}
> > Task
> > > >>> threw
> > > >>> > an
> > > >>> > >> uncaught and unrecoverable exception. Task is being killed and
> > > will
> > > >>> not
> > > >>> > >> recover until manually restarted
> > > >>> > >> (org.apache.kafka.connect.runtime.WorkerTask)
> > > >>> > >> [task-thread-json-sftp-source-connector-0]
> > > >>> > >>
> > > >>> > >> Do you know why it is showing an abort state even if I call
> > > commit?
> > > >>> > >>
> > > >>> > >> I tested one more scenario, When I call the commit I saw the
> > below
> > > >>> > >>
> > > >>> >
> > > >>>
> > >
> >
> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
> > > >>> > >> producerId=11, producerEpoch=2, txnTimeoutMs=60000,
> > > state=*Ongoing*,
> > > >>> > >> pendingState=None, topicPartitions=HashSet(streams-input-2),
> > > >>> > >> txnStartTimestamp=1678620463834,
> > > >>> txnLastUpdateTimestamp=1678620463834)
> > > >>> > >> Then, before changing the states to Abort, I dropped the next
> > file
> > > >>> then
> > > >>> > I
> > > >>> > >> dont see any issues. Previous transaction
> > > >>> > >> as well as the current transaction are committed.
> > > >>> > >>
> > > >>> > >> Thank you for your support.
> > > >>> > >>
> > > >>> > >> Thanks,
> > > >>> > >> Nitty
> > > >>> > >>
> > > >>> > >> On Fri, Mar 10, 2023 at 8:04 PM Chris Egerton
> > > >>> <chr...@aiven.io.invalid>
> > > >>> > >> wrote:
> > > >>> > >>
> > > >>> > >>> Hi Nitty,
> > > >>> > >>>
> > > >>> > >>> > I called commitTransaction when I reach the first error
> > record,
> > > >>> but
> > > >>> > >>> commit is not happening for me. Kafka connect tries to abort
> > the
> > > >>> > >>> transaction automatically
> > > >>> > >>>
> > > >>> > >>> This is really interesting--are you certain that your task
> > never
> > > >>> > invoked
> > > >>> > >>> TransactionContext::abortTransaction in this case? I'm
> looking
> > > >>> over the
> > > >>> > >>> code base and it seems fairly clear that the only thing that
> > > could
> > > >>> > >>> trigger
> > > >>> > >>> a call to KafkaProducer::abortTransaction is a request by the
> > > task
> > > >>> to
> > > >>> > >>> abort
> > > >>> > >>> a transaction (either for a next batch, or for a specific
> > > record).
> > > >>> It
> > > >>> > may
> > > >>> > >>> help to run the connector in a debugger and/or look for
> > "Aborting
> > > >>> > >>> transaction for batch as requested by connector" or "Aborting
> > > >>> > transaction
> > > >>> > >>> for record on topic <TOPIC NAME HERE> as requested by
> > connector"
> > > >>> log
> > > >>> > >>> messages (which will be emitted at INFO level by
> > > >>> > >>> the
> > org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask
> > > >>> class
> > > >>> > if
> > > >>> > >>> the task is requesting an abort).
> > > >>> > >>>
> > > >>> > >>> Regardless, I'll work on a fix for the bug with aborting
> empty
> > > >>> > >>> transactions. Thanks for helping uncover that one!
> > > >>> > >>>
> > > >>> > >>> Cheers,
> > > >>> > >>>
> > > >>> > >>> Chris
> > > >>> > >>>
> > > >>> > >>> On Thu, Mar 9, 2023 at 6:36 PM NITTY BENNY <
> > nittybe...@gmail.com
> > > >
> > > >>> > wrote:
> > > >>> > >>>
> > > >>> > >>> > Hi Chris,
> > > >>> > >>> >
> > > >>> > >>> > We have a use case to commit previous successful records
> and
> > > >>> stop the
> > > >>> > >>> > processing of the current file and move on with the next
> > file.
> > > To
> > > >>> > >>> achieve
> > > >>> > >>> > that I called commitTransaction when I reach the first
> error
> > > >>> record,
> > > >>> > >>> but
> > > >>> > >>> > commit is not happening for me. Kafka connect tries to
> abort
> > > the
> > > >>> > >>> > transaction automatically, I checked the _transaction_state
> > > >>> topic and
> > > >>> > >>> > states marked as PrepareAbort and CompleteAbort. Do you
> know
> > > why
> > > >>> > kafka
> > > >>> > >>> > connect automatically invokes abort instead of the implicit
> > > >>> commit I
> > > >>> > >>> > called?
> > > >>> > >>> > Then as a result, when I tries to parse the next file - say
> > > ABC,
> > > >>> I
> > > >>> > saw
> > > >>> > >>> the
> > > >>> > >>> > logs "Aborting incomplete transaction" and ERROR: "Failed
> to
> > > sent
> > > >>> > >>> record to
> > > >>> > >>> > topic", and we lost the first batch of records from the
> > current
> > > >>> > >>> transaction
> > > >>> > >>> > in the file ABC.
> > > >>> > >>> >
> > > >>> > >>> > Is it possible that there's a case where an abort is being
> > > >>> requested
> > > >>> > >>> while
> > > >>> > >>> > the current transaction is empty (i.e., the task hasn't
> > > returned
> > > >>> any
> > > >>> > >>> > records from SourceTask::poll since the last transaction
> was
> > > >>> > >>> > committed/aborted)? --- Yes, that case is possible for us.
> > > There
> > > >>> is a
> > > >>> > >>> case
> > > >>> > >>> > where the first record itself an error record.
> > > >>> > >>> >
> > > >>> > >>> > Thanks,
> > > >>> > >>> > Nitty
> > > >>> > >>> >
> > > >>> > >>> > On Thu, Mar 9, 2023 at 3:48 PM Chris Egerton
> > > >>> <chr...@aiven.io.invalid
> > > >>> > >
> > > >>> > >>> > wrote:
> > > >>> > >>> >
> > > >>> > >>> > > Hi Nitty,
> > > >>> > >>> > >
> > > >>> > >>> > > Thanks for the code examples and the detailed
> explanations,
> > > >>> this is
> > > >>> > >>> > really
> > > >>> > >>> > > helpful!
> > > >>> > >>> > >
> > > >>> > >>> > > > Say if I have a file with 5 records and batch size is
> 2,
> > > and
> > > >>> in
> > > >>> > my
> > > >>> > >>> 3rd
> > > >>> > >>> > > batch I have one error record then in that batch, I dont
> > > have a
> > > >>> > valid
> > > >>> > >>> > > record to call commit or abort. But I want to commit all
> > the
> > > >>> > previous
> > > >>> > >>> > > batches that were successfully parsed. How do I do that?
> > > >>> > >>> > >
> > > >>> > >>> > > An important thing to keep in mind with the
> > > TransactionContext
> > > >>> API
> > > >>> > is
> > > >>> > >>> > that
> > > >>> > >>> > > all records that a task returns from SourceTask::poll are
> > > >>> > implicitly
> > > >>> > >>> > > included in a transaction. Invoking
> > > >>> > >>> SourceTaskContext::transactionContext
> > > >>> > >>> > > doesn't alter this or cause transactions to start being
> > used;
> > > >>> > >>> everything
> > > >>> > >>> > is
> > > >>> > >>> > > already in a transaction, and the Connect runtime
> > > automatically
> > > >>> > >>> begins
> > > >>> > >>> > > transactions for any records it sees from the task if it
> > > hasn't
> > > >>> > >>> already
> > > >>> > >>> > > begun one. It's also valid to return a null or empty list
> > of
> > > >>> > records
> > > >>> > >>> from
> > > >>> > >>> > > SourceTask::poll. So in this case, you can invoke
> > > >>> > >>> > > transactionContext.commitTransaction() (the no-args
> > variant)
> > > >>> and
> > > >>> > >>> return
> > > >>> > >>> > an
> > > >>> > >>> > > empty batch from SourceTask::poll, which will cause the
> > > >>> transaction
> > > >>> > >>> > > containing the 4 valid records that were returned in the
> > > last 2
> > > >>> > >>> batches
> > > >>> > >>> > to
> > > >>> > >>> > > be committed.
> > > >>> > >>> > >
> > > >>> > >>> > > FWIW, I would be a little cautious about this approach.
> > Many
> > > >>> times
> > > >>> > >>> it's
> > > >>> > >>> > > better to fail fast on invalid data; it might be worth it
> > to
> > > at
> > > >>> > least
> > > >>> > >>> > allow
> > > >>> > >>> > > users to configure whether the connector fails on invalid
> > > >>> data, or
> > > >>> > >>> > silently
> > > >>> > >>> > > skips over it (which is what happens when transactions
> are
> > > >>> > aborted).
> > > >>> > >>> > >
> > > >>> > >>> > > > Why is abort not working without adding the last record
> > to
> > > >>> the
> > > >>> > >>> list?
> > > >>> > >>> > >
> > > >>> > >>> > > Is it possible that there's a case where an abort is
> being
> > > >>> > requested
> > > >>> > >>> > while
> > > >>> > >>> > > the current transaction is empty (i.e., the task hasn't
> > > >>> returned
> > > >>> > any
> > > >>> > >>> > > records from SourceTask::poll since the last transaction
> > was
> > > >>> > >>> > > committed/aborted)? I think this may be a bug in the
> > Connect
> > > >>> > >>> framework
> > > >>> > >>> > > where we don't check to see if a transaction is already
> > open
> > > >>> when a
> > > >>> > >>> task
> > > >>> > >>> > > requests that a transaction be aborted, which can cause
> > tasks
> > > >>> to
> > > >>> > fail
> > > >>> > >>> > (see
> > > >>> > >>> > > https://issues.apache.org/jira/browse/KAFKA-14799 for
> more
> > > >>> > details).
> > > >>> > >>> > >
> > > >>> > >>> > > Cheers,
> > > >>> > >>> > >
> > > >>> > >>> > > Chris
> > > >>> > >>> > >
> > > >>> > >>> > >
> > > >>> > >>> > > On Wed, Mar 8, 2023 at 6:44 PM NITTY BENNY <
> > > >>> nittybe...@gmail.com>
> > > >>> > >>> wrote:
> > > >>> > >>> > >
> > > >>> > >>> > > > Hi Chris,
> > > >>> > >>> > > >
> > > >>> > >>> > > > I am not sure if you are able to see the images I
> shared
> > > with
> > > >>> > you .
> > > >>> > >>> > > > Copying the code snippet below,
> > > >>> > >>> > > >
> > > >>> > >>> > > >  if (expectedRecordCount >= 0) {
> > > >>> > >>> > > >             int missingCount = expectedRecordCount -
> > (int)
> > > >>> this.
> > > >>> > >>> > > > recordOffset() - 1;
> > > >>> > >>> > > >             if (missingCount > 0) {
> > > >>> > >>> > > >               if (transactionContext != null) {
> > > >>> > >>> > > >                 isMissedRecords = true;
> > > >>> > >>> > > >               } else {
> > > >>> > >>> > > >                 throw new
> > > >>> DataException(String.format("Missing %d
> > > >>> > >>> > records
> > > >>> > >>> > > > (expecting %d, actual %d)", missingCount,
> > > >>> expectedRecordCount,
> > > >>> > >>> this.
> > > >>> > >>> > > > recordOffset()));
> > > >>> > >>> > > >               }
> > > >>> > >>> > > >             } else if (missingCount < 0) {
> > > >>> > >>> > > >               if (transactionContext != null) {
> > > >>> > >>> > > >                 isMissedRecords = true;
> > > >>> > >>> > > >               } else {
> > > >>> > >>> > > >                 throw new
> > DataException(String.format("Too
> > > >>> many
> > > >>> > >>> records
> > > >>> > >>> > > > (expecting %d, actual %d)", expectedRecordCount,
> > > >>> > >>> this.recordOffset()));
> > > >>> > >>> > > >               }
> > > >>> > >>> > > >             }
> > > >>> > >>> > > >           }
> > > >>> > >>> > > >           addLastRecord(records, null, value);
> > > >>> > >>> > > >         }
> > > >>> > >>> > > >
> > > >>> > >>> > > >
> > > >>> > >>> > > >
> > > >>> > >>> > > >  //asn1 or binary abort
> > > >>> > >>> > > >         if((config.parseErrorThreshold != null &&
> > > >>> parseErrorCount
> > > >>> > >>> >=
> > > >>> > >>> > > > config.parseErrorThreshold
> > > >>> > >>> > > >         && lastbatch && transactionContext != null) ||
> > > >>> > >>> (isMissedRecords
> > > >>> > >>> > > > && transactionContext != null && lastbatch)) {
> > > >>> > >>> > > >           log.info("Transaction is aborting");
> > > >>> > >>> > > >             log.info("records = {}", records);
> > > >>> > >>> > > >             if (!records.isEmpty()) {
> > > >>> > >>> > > >               log.info("with record");
> > > >>> > >>> > > >
> > > >>> > >>> > >
> > transactionContext.abortTransaction(records.get(records.size
> > > >>> > >>> > > > ()-1));
> > > >>> > >>> > > >             } else {
> > > >>> > >>> > > >               log.info("without record");
> > > >>> > >>> > > >               transactionContext.abortTransaction();
> > > >>> > >>> > > >             }
> > > >>> > >>> > > >
> > > >>> > >>> > > > Thanks,
> > > >>> > >>> > > > Nitty
> > > >>> > >>> > > >
> > > >>> > >>> > > > On Wed, Mar 8, 2023 at 11:38 PM NITTY BENNY <
> > > >>> > nittybe...@gmail.com>
> > > >>> > >>> > > wrote:
> > > >>> > >>> > > >
> > > >>> > >>> > > >> Hi Chris,
> > > >>> > >>> > > >> Sorry for the typo in my previous email.
> > > >>> > >>> > > >>
> > > >>> > >>> > > >> Regarding the point 2,* the task returns a batch of
> > > records
> > > >>> from
> > > >>> > >>> > > >> SourceTask::poll (and, if using*
> > > >>> > >>> > > >>
> > > >>> > >>> > > >>
> > > >>> > >>> > > >> *the per-record API provided by the TransactionContext
> > > >>> class,
> > > >>> > >>> includes
> > > >>> > >>> > > >> atleast one record that should trigger a transaction
> > > >>> > commit/abort
> > > >>> > >>> in
> > > >>> > >>> > > >> thatbatch)*
> > > >>> > >>> > > >> What if I am using the API without passing a record?
> We
> > > >>> have 2
> > > >>> > >>> types
> > > >>> > >>> > of
> > > >>> > >>> > > >> use cases, one where on encountering an error record,
> we
> > > >>> want to
> > > >>> > >>> > commit
> > > >>> > >>> > > >> previous successful batches and disregard the failed
> > > record
> > > >>> and
> > > >>> > >>> > upcoming
> > > >>> > >>> > > >> batches. In this case we created the
> transactionContext
> > > just
> > > >>> > >>> before
> > > >>> > >>> > > reading
> > > >>> > >>> > > >> the file (file is our transaction boundary).Say if I
> > have
> > > a
> > > >>> file
> > > >>> > >>> with
> > > >>> > >>> > 5
> > > >>> > >>> > > >> records and batch size is 2, and in my 3rd batch I
> have
> > > one
> > > >>> > error
> > > >>> > >>> > record
> > > >>> > >>> > > >> then in that batch, I dont have a valid record to call
> > > >>> commit or
> > > >>> > >>> > abort.
> > > >>> > >>> > > But
> > > >>> > >>> > > >> I want to commit all the previous batches that were
> > > >>> successfully
> > > >>> > >>> > parsed.
> > > >>> > >>> > > >> How do I do that?
> > > >>> > >>> > > >>
> > > >>> > >>> > > >> Second use case is where I want to abort a transaction
> > if
> > > >>> the
> > > >>> > >>> record
> > > >>> > >>> > > >> count doesn't match.
> > > >>> > >>> > > >> Code Snippet :
> > > >>> > >>> > > >> [image: image.png]
> > > >>> > >>> > > >> There are no error records in this case. If you see I
> > > added
> > > >>> the
> > > >>> > >>> > > condition
> > > >>> > >>> > > >> of transactionContext check to implement exactly once,
> > > >>> without
> > > >>> > >>> > > >> transaction it was just throwing the exception without
> > > >>> calling
> > > >>> > the
> > > >>> > >>> > > >> addLastRecord() method and in the catch block it just
> > logs
> > > >>> the
> > > >>> > >>> message
> > > >>> > >>> > > and
> > > >>> > >>> > > >> return the list of records without the last record to
> > > >>> poll().To
> > > >>> > >>> make
> > > >>> > >>> > it
> > > >>> > >>> > > >> work, I called the method addLastRecord() in this
> case,
> > so
> > > >>> it is
> > > >>> > >>> not
> > > >>> > >>> > > >> throwing the exception and list has last record as
> well.
> > > >>> Then I
> > > >>> > >>> called
> > > >>> > >>> > > the
> > > >>> > >>> > > >> abort, everything got aborted. Why is abort not
> working
> > > >>> without
> > > >>> > >>> adding
> > > >>> > >>> > > the
> > > >>> > >>> > > >> last record to the list?
> > > >>> > >>> > > >> [image: image.png]
> > > >>> > >>> > > >>
> > > >>> > >>> > > >> Code to call abort.
> > > >>> > >>> > > >>
> > > >>> > >>> > > >>
> > > >>> > >>> > > >>
> > > >>> > >>> > > >>
> > > >>> > >>> > > >> Thanks,
> > > >>> > >>> > > >> Nitty
> > > >>> > >>> > > >>
> > > >>> > >>> > > >> On Wed, Mar 8, 2023 at 4:26 PM Chris Egerton
> > > >>> > >>> <chr...@aiven.io.invalid
> > > >>> > >>> > >
> > > >>> > >>> > > >> wrote:
> > > >>> > >>> > > >>
> > > >>> > >>> > > >>> Hi Nitty,
> > > >>> > >>> > > >>>
> > > >>> > >>> > > >>> I'm a little confused about what you mean by this
> part:
> > > >>> > >>> > > >>>
> > > >>> > >>> > > >>> > transaction is not getting completed because it is
> > not
> > > >>> > >>> commiting
> > > >>> > >>> > the
> > > >>> > >>> > > >>> transaction offest.
> > > >>> > >>> > > >>>
> > > >>> > >>> > > >>> The only conditions required for a transaction to be
> > > >>> completed
> > > >>> > >>> when a
> > > >>> > >>> > > >>> connector is defining its own transaction boundaries
> > are:
> > > >>> > >>> > > >>>
> > > >>> > >>> > > >>> 1. The task requests a transaction commit/abort from
> > the
> > > >>> > >>> > > >>> TransactionContext
> > > >>> > >>> > > >>> 2. The task returns a batch of records from
> > > >>> SourceTask::poll
> > > >>> > >>> (and, if
> > > >>> > >>> > > >>> using
> > > >>> > >>> > > >>> the per-record API provided by the TransactionContext
> > > >>> class,
> > > >>> > >>> includes
> > > >>> > >>> > > at
> > > >>> > >>> > > >>> least one record that should trigger a transaction
> > > >>> commit/abort
> > > >>> > >>> in
> > > >>> > >>> > that
> > > >>> > >>> > > >>> batch)
> > > >>> > >>> > > >>>
> > > >>> > >>> > > >>> The Connect runtime should automatically commit
> source
> > > >>> offsets
> > > >>> > to
> > > >>> > >>> > Kafka
> > > >>> > >>> > > >>> whenever a transaction is completed, either by commit
> > or
> > > >>> abort.
> > > >>> > >>> This
> > > >>> > >>> > is
> > > >>> > >>> > > >>> because transactions should only be aborted for data
> > that
> > > >>> > should
> > > >>> > >>> > never
> > > >>> > >>> > > be
> > > >>> > >>> > > >>> re-read by the connector; if there is a validation
> > error
> > > >>> that
> > > >>> > >>> should
> > > >>> > >>> > be
> > > >>> > >>> > > >>> handled by reconfiguring the connector, then the task
> > > >>> should
> > > >>> > >>> throw an
> > > >>> > >>> > > >>> exception instead of aborting the transaction.
> > > >>> > >>> > > >>>
> > > >>> > >>> > > >>> If possible, do you think you could provide a brief
> > code
> > > >>> > snippet
> > > >>> > >>> > > >>> illustrating what your task is doing that's causing
> > > issues?
> > > >>> > >>> > > >>>
> > > >>> > >>> > > >>> Cheers,
> > > >>> > >>> > > >>>
> > > >>> > >>> > > >>> Chris (not Chrise 🙂)
> > > >>> > >>> > > >>>
> > > >>> > >>> > > >>> On Tue, Mar 7, 2023 at 10:17 AM NITTY BENNY <
> > > >>> > >>> nittybe...@gmail.com>
> > > >>> > >>> > > >>> wrote:
> > > >>> > >>> > > >>>
> > > >>> > >>> > > >>> > Hi Chrise,
> > > >>> > >>> > > >>> >
> > > >>> > >>> > > >>> > Thanks for sharing the details.
> > > >>> > >>> > > >>> >
> > > >>> > >>> > > >>> > Regarding the use case, For Asn1 source connector
> we
> > > >>> have a
> > > >>> > use
> > > >>> > >>> > case
> > > >>> > >>> > > to
> > > >>> > >>> > > >>> > validate number of records in the file with the
> > number
> > > of
> > > >>> > >>> records
> > > >>> > >>> > in
> > > >>> > >>> > > >>> the
> > > >>> > >>> > > >>> > header. So currently, if validation fails we are
> not
> > > >>> sending
> > > >>> > >>> the
> > > >>> > >>> > last
> > > >>> > >>> > > >>> > record to the topic. But after introducing exactly
> > once
> > > >>> with
> > > >>> > >>> > > connector
> > > >>> > >>> > > >>> > transaction boundary, I can see that if I call an
> > abort
> > > >>> when
> > > >>> > >>> the
> > > >>> > >>> > > >>> validation
> > > >>> > >>> > > >>> > fails, transaction is not getting completed because
> > it
> > > >>> is not
> > > >>> > >>> > > >>> commiting the
> > > >>> > >>> > > >>> > transaction offest. I saw that transaction state
> > > changed
> > > >>> to
> > > >>> > >>> > > >>> CompleteAbort.
> > > >>> > >>> > > >>> > So for my next transaction I am getting
> > > >>> > >>> > InvalidProducerEpochException
> > > >>> > >>> > > >>> and
> > > >>> > >>> > > >>> > then task stopped after that. I tried calling the
> > abort
> > > >>> after
> > > >>> > >>> > sending
> > > >>> > >>> > > >>> last
> > > >>> > >>> > > >>> > record to the topic then transaction getting
> > completed.
> > > >>> > >>> > > >>> >
> > > >>> > >>> > > >>> > I dont know if I am doing anything wrong here.
> > > >>> > >>> > > >>> >
> > > >>> > >>> > > >>> > Please advise.
> > > >>> > >>> > > >>> > Thanks,
> > > >>> > >>> > > >>> > Nitty
> > > >>> > >>> > > >>> >
> > > >>> > >>> > > >>> > On Tue 7 Mar 2023 at 2:21 p.m., Chris Egerton
> > > >>> > >>> > > <chr...@aiven.io.invalid
> > > >>> > >>> > > >>> >
> > > >>> > >>> > > >>> > wrote:
> > > >>> > >>> > > >>> >
> > > >>> > >>> > > >>> > > Hi Nitty,
> > > >>> > >>> > > >>> > >
> > > >>> > >>> > > >>> > > We've recently added some documentation on
> > > implementing
> > > >>> > >>> > > exactly-once
> > > >>> > >>> > > >>> > source
> > > >>> > >>> > > >>> > > connectors here:
> > > >>> > >>> > > >>> > >
> > > >>> > >>> > > >>> >
> > > >>> > >>> > > >>>
> > > >>> > >>> > >
> > > >>> > >>> >
> > > >>> > >>>
> > > >>> >
> > > >>>
> > >
> >
> https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors
> > > >>> > >>> > > >>> > > .
> > > >>> > >>> > > >>> > > To quote a relevant passage from those docs:
> > > >>> > >>> > > >>> > >
> > > >>> > >>> > > >>> > > > In order for a source connector to take
> advantage
> > > of
> > > >>> this
> > > >>> > >>> > > support,
> > > >>> > >>> > > >>> it
> > > >>> > >>> > > >>> > > must be able to provide meaningful source offsets
> > for
> > > >>> each
> > > >>> > >>> record
> > > >>> > >>> > > >>> that it
> > > >>> > >>> > > >>> > > emits, and resume consumption from the external
> > > system
> > > >>> at
> > > >>> > the
> > > >>> > >>> > exact
> > > >>> > >>> > > >>> > > position corresponding to any of those offsets
> > > without
> > > >>> > >>> dropping
> > > >>> > >>> > or
> > > >>> > >>> > > >>> > > duplicating messages.
> > > >>> > >>> > > >>> > >
> > > >>> > >>> > > >>> > > So, as long as your source connector is able to
> use
> > > the
> > > >>> > Kafka
> > > >>> > >>> > > Connect
> > > >>> > >>> > > >>> > > framework's offsets API correctly, it shouldn't
> be
> > > >>> > necessary
> > > >>> > >>> to
> > > >>> > >>> > > make
> > > >>> > >>> > > >>> any
> > > >>> > >>> > > >>> > > other code changes to the connector.
> > > >>> > >>> > > >>> > >
> > > >>> > >>> > > >>> > > To enable exactly-once support for source
> > connectors
> > > on
> > > >>> > your
> > > >>> > >>> > > Connect
> > > >>> > >>> > > >>> > > cluster, see the docs section here:
> > > >>> > >>> > > >>> > >
> > > >>> > >>> >
> > > >>> https://kafka.apache.org/documentation/#connect_exactlyoncesource
> > > >>> > >>> > > >>> > >
> > > >>> > >>> > > >>> > > With regard to transactions, a transactional
> > producer
> > > >>> is
> > > >>> > >>> always
> > > >>> > >>> > > >>> created
> > > >>> > >>> > > >>> > > automatically for your connector by the Connect
> > > runtime
> > > >>> > when
> > > >>> > >>> > > >>> exactly-once
> > > >>> > >>> > > >>> > > support is enabled on the worker. The only reason
> > to
> > > >>> set
> > > >>> > >>> > > >>> > > "transaction.boundary" to "connector" is if your
> > > >>> connector
> > > >>> > >>> would
> > > >>> > >>> > > >>> like to
> > > >>> > >>> > > >>> > > explicitly define its own transaction boundaries.
> > In
> > > >>> this
> > > >>> > >>> case,
> > > >>> > >>> > it
> > > >>> > >>> > > >>> sounds
> > > >>> > >>> > > >>> > > like may be what you want; I just want to make
> sure
> > > to
> > > >>> call
> > > >>> > >>> out
> > > >>> > >>> > > that
> > > >>> > >>> > > >>> in
> > > >>> > >>> > > >>> > > either case, you should not be directly
> > > instantiating a
> > > >>> > >>> producer
> > > >>> > >>> > in
> > > >>> > >>> > > >>> your
> > > >>> > >>> > > >>> > > connector code, but let the Kafka Connect runtime
> > do
> > > >>> that
> > > >>> > for
> > > >>> > >>> > you,
> > > >>> > >>> > > >>> and
> > > >>> > >>> > > >>> > just
> > > >>> > >>> > > >>> > > worry about returning the right records from
> > > >>> > SourceTask::poll
> > > >>> > >>> > (and
> > > >>> > >>> > > >>> > possibly
> > > >>> > >>> > > >>> > > defining custom transactions using the
> > > >>> TransactionContext
> > > >>> > >>> API).
> > > >>> > >>> > > >>> > >
> > > >>> > >>> > > >>> > > With respect to your question about committing or
> > > >>> aborting
> > > >>> > in
> > > >>> > >>> > > certain
> > > >>> > >>> > > >>> > > circumstances, it'd be useful to know more about
> > your
> > > >>> use
> > > >>> > >>> case,
> > > >>> > >>> > > >>> since it
> > > >>> > >>> > > >>> > > may not be necessary to define custom transaction
> > > >>> > boundaries
> > > >>> > >>> in
> > > >>> > >>> > > your
> > > >>> > >>> > > >>> > > connector at all.
> > > >>> > >>> > > >>> > >
> > > >>> > >>> > > >>> > > Cheers,
> > > >>> > >>> > > >>> > >
> > > >>> > >>> > > >>> > > Chris
> > > >>> > >>> > > >>> > >
> > > >>> > >>> > > >>> > >
> > > >>> > >>> > > >>> > >
> > > >>> > >>> > > >>> > > On Tue, Mar 7, 2023 at 7:21 AM NITTY BENNY <
> > > >>> > >>> nittybe...@gmail.com
> > > >>> > >>> > >
> > > >>> > >>> > > >>> wrote:
> > > >>> > >>> > > >>> > >
> > > >>> > >>> > > >>> > > > Hi Team,
> > > >>> > >>> > > >>> > > >
> > > >>> > >>> > > >>> > > > Adding on top of this, I tried creating a
> > > >>> > >>> TransactionContext
> > > >>> > >>> > > >>> object and
> > > >>> > >>> > > >>> > > > calling the commitTransaction and
> abortTranaction
> > > >>> methods
> > > >>> > >>> in
> > > >>> > >>> > > source
> > > >>> > >>> > > >>> > > > connectors.
> > > >>> > >>> > > >>> > > > But the main problem I saw is that if there is
> > any
> > > >>> error
> > > >>> > >>> while
> > > >>> > >>> > > >>> parsing
> > > >>> > >>> > > >>> > > the
> > > >>> > >>> > > >>> > > > record, connect is calling an abort but we
> have a
> > > use
> > > >>> > case
> > > >>> > >>> to
> > > >>> > >>> > > call
> > > >>> > >>> > > >>> > commit
> > > >>> > >>> > > >>> > > > in some cases. Is it a valid use case in terms
> of
> > > >>> kafka
> > > >>> > >>> > connect?
> > > >>> > >>> > > >>> > > >
> > > >>> > >>> > > >>> > > > Another Question - Should I use a transactional
> > > >>> producer
> > > >>> > >>> > instead
> > > >>> > >>> > > >>> > > > creating an object of TransactionContext? Below
> > is
> > > >>> the
> > > >>> > >>> > connector
> > > >>> > >>> > > >>> > > > configuration I am using.
> > > >>> > >>> > > >>> > > >
> > > >>> > >>> > > >>> > > >   exactly.once.support: "required"
> > > >>> > >>> > > >>> > > >   transaction.boundary: "connector"
> > > >>> > >>> > > >>> > > >
> > > >>> > >>> > > >>> > > > Could you please help me here?
> > > >>> > >>> > > >>> > > >
> > > >>> > >>> > > >>> > > > Thanks,
> > > >>> > >>> > > >>> > > > Nitty
> > > >>> > >>> > > >>> > > >
> > > >>> > >>> > > >>> > > > On Tue, Mar 7, 2023 at 12:29 AM NITTY BENNY <
> > > >>> > >>> > > nittybe...@gmail.com>
> > > >>> > >>> > > >>> > > wrote:
> > > >>> > >>> > > >>> > > >
> > > >>> > >>> > > >>> > > > > Hi Team,
> > > >>> > >>> > > >>> > > > > I am trying to implement exactly once
> behavior
> > in
> > > >>> our
> > > >>> > >>> source
> > > >>> > >>> > > >>> > connector.
> > > >>> > >>> > > >>> > > > Is
> > > >>> > >>> > > >>> > > > > there any sample source connector
> > implementation
> > > >>> > >>> available to
> > > >>> > >>> > > >>> have a
> > > >>> > >>> > > >>> > > look
> > > >>> > >>> > > >>> > > > > at?
> > > >>> > >>> > > >>> > > > > Regards,
> > > >>> > >>> > > >>> > > > > Nitty
> > > >>> > >>> > > >>> > > > >
> > > >>> > >>> > > >>> > > >
> > > >>> > >>> > > >>> > >
> > > >>> > >>> > > >>> >
> > > >>> > >>> > > >>>
> > > >>> > >>> > > >>
> > > >>> > >>> > >
> > > >>> > >>> >
> > > >>> > >>>
> > > >>> > >>
> > > >>> >
> > > >>>
> > > >>
> > >
> >
>
>

Reply via email to