Re: Exactly once kafka connect query

2023-03-16 Thread Chris Egerton
Hi Nitty,

I understand your concerns about preserving intellectual property. Perhaps
to avoid these altogether, instead of a call, you can provide a
reproduction of your issues that is acceptable to share with the public? If
I'm able to successfully diagnose the problem, I can share a summary on the
mailing list, and if things are still unclear, I can join a call to discuss
the publicly-visible example and what seems to be the issue.

Cheers,

Chris

On Thu, Mar 16, 2023 at 5:54 AM NITTY BENNY  wrote:

> Hi Xiaoxia,
>
> I am not able to see the attachments you shared with me. I don't understand
> the problem you are talking about.
> What do you want me to look?
>
> Thanks,
> Nitty
>
> On Thu, Mar 16, 2023 at 1:54 AM 小侠 <747359...@qq.com.invalid> wrote:
>
> > Hi Nitty,
> > I'm so sorry to forget the signature.
> > Looking forward to your reply.
> >
> >
> > Thank you,
> > Xiaoxia
> >
> >
> >
> >
> > -------------- 原始邮件 ----------
> > *发件人:* "users" ;
> > *发送时间:* 2023年3月15日(星期三) 晚上6:38
> > *收件人:* "users";
> > *主题:* Re: Exactly once kafka connect query
> >
> > Hi Chris,
> >
> > We won't be abe to share the source code since it is the properetry
> Amdocs
> > code.
> >
> > If you have time for a call, I can show you the code and
> > reproduction scenario over the call. I strongly believe you can find the
> > issue with that.
> >
> > Thanks,
> > Nitty
> >
> > Thanks,
> > Nitty
> >
> > On Tue, Mar 14, 2023 at 3:04 PM Chris Egerton 
> > wrote:
> >
> > > Hi Nitty,
> > >
> > > Sorry, I should have clarified. The reason I'm thinking about shutdown
> > here
> > > is that, when exactly-once support is enabled on a Kafka Connect
> cluster
> > > and a new set of task configurations is generated for a connector, the
> > > Connect framework makes an effort to shut down all the old task
> instances
> > > for that connector, and then fences out the transactional producers for
> > all
> > > of those instances. I was thinking that this may lead to the producer
> > > exceptions you are seeing but, after double-checking this assumption,
> > that
> > > does not appear to be the case.
> > >
> > > Would it be possible to share the source code for your connector and a
> > > reproduction scenario for what you're seeing? That may be easier than
> > > coordinating a call.
> > >
> > > Cheers,
> > >
> > > Chris
> > >
> > > On Tue, Mar 14, 2023 at 6:15 AM NITTY BENNY 
> > wrote:
> > >
> > > > Hi Chris,
> > > >
> > > > Is there any possibility to have a call with you? This is actually
> > > blocking
> > > > our delivery, I actually want to sort with this.
> > > >
> > > > Thanks,
> > > > Nitty
> > > >
> > > > On Mon, Mar 13, 2023 at 8:18 PM NITTY BENNY 
> > > wrote:
> > > >
> > > > > Hi Chris,
> > > > >
> > > > > I really don't understand why a graceful shutdown will happen
> during
> > a
> > > > > commit operation? Am I understanding something wrong here?. I see
> > > > > this happens when I have a batch of 2 valid records and in the
> second
> > > > > batch the record is invalid. In that case I want to commit the
> valid
> > > > > records. So I called commit and sent an empty list for the current
> > > batch
> > > > to
> > > > > poll() and then when the next file comes in and poll sees new
> > records,
> > > I
> > > > > see InvalidProducerEpochException.
> > > > > Please advise me.
> > > > >
> > > > > Thanks,
> > > > > Nitty
> > > > >
> > > > > On Mon, Mar 13, 2023 at 5:33 PM NITTY BENNY 
> > > > wrote:
> > > > >
> > > > >> Hi Chris,
> > > > >>
> > > > >> The difference is in the Task Classes, no difference for value/key
> > > > >> convertors.
> > > > >>
> > > > >> I don’t see log messages for graceful shutdown. I am not clear on
> > what
> > > > >> you mean by shutting down the task.
> > > > >>
> > > > >> I called the commit operation for the successful records. Should 

Re: Exactly once kafka connect query

2023-03-16 Thread NITTY BENNY
Hi Xiaoxia,

I am not able to see the attachments you shared with me. I don't understand
the problem you are talking about.
What do you want me to look?

Thanks,
Nitty

On Thu, Mar 16, 2023 at 1:54 AM 小侠 <747359...@qq.com.invalid> wrote:

> Hi Nitty,
> I'm so sorry to forget the signature.
> Looking forward to your reply.
>
>
> Thank you,
> Xiaoxia
>
>
>
>
> -- 原始邮件 --
> *发件人:* "users" ;
> *发送时间:* 2023年3月15日(星期三) 晚上6:38
> *收件人:* "users";
> *主题:* Re: Exactly once kafka connect query
>
> Hi Chris,
>
> We won't be abe to share the source code since it is the properetry Amdocs
> code.
>
> If you have time for a call, I can show you the code and
> reproduction scenario over the call. I strongly believe you can find the
> issue with that.
>
> Thanks,
> Nitty
>
> Thanks,
> Nitty
>
> On Tue, Mar 14, 2023 at 3:04 PM Chris Egerton 
> wrote:
>
> > Hi Nitty,
> >
> > Sorry, I should have clarified. The reason I'm thinking about shutdown
> here
> > is that, when exactly-once support is enabled on a Kafka Connect cluster
> > and a new set of task configurations is generated for a connector, the
> > Connect framework makes an effort to shut down all the old task instances
> > for that connector, and then fences out the transactional producers for
> all
> > of those instances. I was thinking that this may lead to the producer
> > exceptions you are seeing but, after double-checking this assumption,
> that
> > does not appear to be the case.
> >
> > Would it be possible to share the source code for your connector and a
> > reproduction scenario for what you're seeing? That may be easier than
> > coordinating a call.
> >
> > Cheers,
> >
> > Chris
> >
> > On Tue, Mar 14, 2023 at 6:15 AM NITTY BENNY 
> wrote:
> >
> > > Hi Chris,
> > >
> > > Is there any possibility to have a call with you? This is actually
> > blocking
> > > our delivery, I actually want to sort with this.
> > >
> > > Thanks,
> > > Nitty
> > >
> > > On Mon, Mar 13, 2023 at 8:18 PM NITTY BENNY 
> > wrote:
> > >
> > > > Hi Chris,
> > > >
> > > > I really don't understand why a graceful shutdown will happen during
> a
> > > > commit operation? Am I understanding something wrong here?. I see
> > > > this happens when I have a batch of 2 valid records and in the second
> > > > batch the record is invalid. In that case I want to commit the valid
> > > > records. So I called commit and sent an empty list for the current
> > batch
> > > to
> > > > poll() and then when the next file comes in and poll sees new
> records,
> > I
> > > > see InvalidProducerEpochException.
> > > > Please advise me.
> > > >
> > > > Thanks,
> > > > Nitty
> > > >
> > > > On Mon, Mar 13, 2023 at 5:33 PM NITTY BENNY 
> > > wrote:
> > > >
> > > >> Hi Chris,
> > > >>
> > > >> The difference is in the Task Classes, no difference for value/key
> > > >> convertors.
> > > >>
> > > >> I don’t see log messages for graceful shutdown. I am not clear on
> what
> > > >> you mean by shutting down the task.
> > > >>
> > > >> I called the commit operation for the successful records. Should I
> > > >> perform any other steps if I have an invalid record?
> > > >> Please advise.
> > > >>
> > > >> Thanks,
> > > >> Nitty
> > > >>
> > > >> On Mon, Mar 13, 2023 at 3:42 PM Chris Egerton
>  > >
> > > >> wrote:
> > > >>
> > > >>> Hi Nitty,
> > > >>>
> > > >>> Thanks again for all the details here, especially the log messages.
> > > >>>
> > > >>> > The below mentioned issue is happening for Json connector only.
> Is
> > > >>> there
> > > >>> any difference with asn1,binary,csv and json connector?
> > > >>>
> > > >>> Can you clarify if the difference here is in the Connector/Task
> > > classens,
> > > >>> or if it's in the key/value converters that are configured for the
> > > >>> connector? The key/value converters are configured using t

Re: Exactly once kafka connect query

2023-03-15 Thread NITTY BENNY
Hi Chris,

We won't be abe to share the source code since it is the properetry Amdocs
code.

If you have time for a call, I can show you the code and
reproduction scenario over the call. I strongly believe you can find the
issue with that.

Thanks,
Nitty

Thanks,
Nitty

On Tue, Mar 14, 2023 at 3:04 PM Chris Egerton 
wrote:

> Hi Nitty,
>
> Sorry, I should have clarified. The reason I'm thinking about shutdown here
> is that, when exactly-once support is enabled on a Kafka Connect cluster
> and a new set of task configurations is generated for a connector, the
> Connect framework makes an effort to shut down all the old task instances
> for that connector, and then fences out the transactional producers for all
> of those instances. I was thinking that this may lead to the producer
> exceptions you are seeing but, after double-checking this assumption, that
> does not appear to be the case.
>
> Would it be possible to share the source code for your connector and a
> reproduction scenario for what you're seeing? That may be easier than
> coordinating a call.
>
> Cheers,
>
> Chris
>
> On Tue, Mar 14, 2023 at 6:15 AM NITTY BENNY  wrote:
>
> > Hi Chris,
> >
> > Is there any possibility to have a call with you? This is actually
> blocking
> > our delivery, I actually want to sort with this.
> >
> > Thanks,
> > Nitty
> >
> > On Mon, Mar 13, 2023 at 8:18 PM NITTY BENNY 
> wrote:
> >
> > > Hi Chris,
> > >
> > > I really don't understand why a graceful shutdown will happen during a
> > > commit operation? Am I understanding something wrong here?. I see
> > > this happens when I have a batch of 2 valid records and in the second
> > > batch the record is invalid. In that case I want to commit the valid
> > > records. So I called commit and sent an empty list for the current
> batch
> > to
> > > poll() and then when the next file comes in and poll sees new records,
> I
> > > see InvalidProducerEpochException.
> > > Please advise me.
> > >
> > > Thanks,
> > > Nitty
> > >
> > > On Mon, Mar 13, 2023 at 5:33 PM NITTY BENNY 
> > wrote:
> > >
> > >> Hi Chris,
> > >>
> > >> The difference is in the Task Classes, no difference for value/key
> > >> convertors.
> > >>
> > >> I don’t see log messages for graceful shutdown. I am not clear on what
> > >> you mean by shutting down the task.
> > >>
> > >> I called the commit operation for the successful records. Should I
> > >> perform any other steps if I have an invalid record?
> > >> Please advise.
> > >>
> > >> Thanks,
> > >> Nitty
> > >>
> > >> On Mon, Mar 13, 2023 at 3:42 PM Chris Egerton  >
> > >> wrote:
> > >>
> > >>> Hi Nitty,
> > >>>
> > >>> Thanks again for all the details here, especially the log messages.
> > >>>
> > >>> > The below mentioned issue is happening for Json connector only. Is
> > >>> there
> > >>> any difference with asn1,binary,csv and json connector?
> > >>>
> > >>> Can you clarify if the difference here is in the Connector/Task
> > classens,
> > >>> or if it's in the key/value converters that are configured for the
> > >>> connector? The key/value converters are configured using the
> > >>> "key.converter" and "value.converter" property and, if problems arise
> > >>> with
> > >>> them, the task will fail and, if it has a non-empty ongoing
> > transaction,
> > >>> that transaction will be automatically aborted since we close the
> > task's
> > >>> Kafka producer when it fails (or shuts down gracefully).
> > >>>
> > >>> With regards to these log messages:
> > >>>
> > >>> > org.apache.kafka.common.errors.ProducerFencedException: There is a
> > >>> newer
> > >>> producer with the same transactionalId which fences the current one.
> > >>>
> > >>> It looks like your tasks aren't shutting down gracefully in time,
> which
> > >>> causes them to be fenced out by the Connect framework later on. Do
> you
> > >>> see
> > >>> messages like "Graceful stop of task  failed" in the
> logs
> > >>> for
> > >>> your Connect worker?
> > >>>
> > >>> Cheers,
> > >>>
> > >>> Chris
> > >>>
> > >>> On Mon, Mar 13, 2023 at 10:58 AM NITTY BENNY 
> > >>> wrote:
> > >>>
> > >>> > Hi Chris,
> > >>> >
> > >>> > As you said, the below message is coming when I call an abort if
> > there
> > >>> is
> > >>> > an invalid record, then for the next transaction I can see the
> below
> > >>> > message and then the connector will be stopped.
> > >>> > 2023-03-13 14:28:26,043 INFO [json-sftp-source-connector|task-0]
> > >>> Aborting
> > >>> > transaction for batch as requested by connector
> > >>> > (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask)
> > >>> > [task-thread-json-sftp-source-connector-0]
> > >>> > 2023-03-13 14:28:26,043 INFO [json-sftp-source-connector|task-0]
> > >>> [Producer
> > >>> > clientId=connector-producer-json-sftp-source-connector-0,
> > >>> > transactionalId=connect-cluster-json-sftp-source-connector-0]
> > Aborting
> > >>> > incomplete transaction
> > >>> (org.apache.kafka.clients.producer.KafkaProducer)
> > >>> > [task-thread-json-sftp-source-connector-0]
>

Re: Exactly once kafka connect query

2023-03-14 Thread Chris Egerton
Hi Nitty,

Sorry, I should have clarified. The reason I'm thinking about shutdown here
is that, when exactly-once support is enabled on a Kafka Connect cluster
and a new set of task configurations is generated for a connector, the
Connect framework makes an effort to shut down all the old task instances
for that connector, and then fences out the transactional producers for all
of those instances. I was thinking that this may lead to the producer
exceptions you are seeing but, after double-checking this assumption, that
does not appear to be the case.

Would it be possible to share the source code for your connector and a
reproduction scenario for what you're seeing? That may be easier than
coordinating a call.

Cheers,

Chris

On Tue, Mar 14, 2023 at 6:15 AM NITTY BENNY  wrote:

> Hi Chris,
>
> Is there any possibility to have a call with you? This is actually blocking
> our delivery, I actually want to sort with this.
>
> Thanks,
> Nitty
>
> On Mon, Mar 13, 2023 at 8:18 PM NITTY BENNY  wrote:
>
> > Hi Chris,
> >
> > I really don't understand why a graceful shutdown will happen during a
> > commit operation? Am I understanding something wrong here?. I see
> > this happens when I have a batch of 2 valid records and in the second
> > batch the record is invalid. In that case I want to commit the valid
> > records. So I called commit and sent an empty list for the current batch
> to
> > poll() and then when the next file comes in and poll sees new records, I
> > see InvalidProducerEpochException.
> > Please advise me.
> >
> > Thanks,
> > Nitty
> >
> > On Mon, Mar 13, 2023 at 5:33 PM NITTY BENNY 
> wrote:
> >
> >> Hi Chris,
> >>
> >> The difference is in the Task Classes, no difference for value/key
> >> convertors.
> >>
> >> I don’t see log messages for graceful shutdown. I am not clear on what
> >> you mean by shutting down the task.
> >>
> >> I called the commit operation for the successful records. Should I
> >> perform any other steps if I have an invalid record?
> >> Please advise.
> >>
> >> Thanks,
> >> Nitty
> >>
> >> On Mon, Mar 13, 2023 at 3:42 PM Chris Egerton 
> >> wrote:
> >>
> >>> Hi Nitty,
> >>>
> >>> Thanks again for all the details here, especially the log messages.
> >>>
> >>> > The below mentioned issue is happening for Json connector only. Is
> >>> there
> >>> any difference with asn1,binary,csv and json connector?
> >>>
> >>> Can you clarify if the difference here is in the Connector/Task
> classens,
> >>> or if it's in the key/value converters that are configured for the
> >>> connector? The key/value converters are configured using the
> >>> "key.converter" and "value.converter" property and, if problems arise
> >>> with
> >>> them, the task will fail and, if it has a non-empty ongoing
> transaction,
> >>> that transaction will be automatically aborted since we close the
> task's
> >>> Kafka producer when it fails (or shuts down gracefully).
> >>>
> >>> With regards to these log messages:
> >>>
> >>> > org.apache.kafka.common.errors.ProducerFencedException: There is a
> >>> newer
> >>> producer with the same transactionalId which fences the current one.
> >>>
> >>> It looks like your tasks aren't shutting down gracefully in time, which
> >>> causes them to be fenced out by the Connect framework later on. Do you
> >>> see
> >>> messages like "Graceful stop of task  failed" in the logs
> >>> for
> >>> your Connect worker?
> >>>
> >>> Cheers,
> >>>
> >>> Chris
> >>>
> >>> On Mon, Mar 13, 2023 at 10:58 AM NITTY BENNY 
> >>> wrote:
> >>>
> >>> > Hi Chris,
> >>> >
> >>> > As you said, the below message is coming when I call an abort if
> there
> >>> is
> >>> > an invalid record, then for the next transaction I can see the below
> >>> > message and then the connector will be stopped.
> >>> > 2023-03-13 14:28:26,043 INFO [json-sftp-source-connector|task-0]
> >>> Aborting
> >>> > transaction for batch as requested by connector
> >>> > (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask)
> >>> > [task-thread-json-sftp-source-connector-0]
> >>> > 2023-03-13 14:28:26,043 INFO [json-sftp-source-connector|task-0]
> >>> [Producer
> >>> > clientId=connector-producer-json-sftp-source-connector-0,
> >>> > transactionalId=connect-cluster-json-sftp-source-connector-0]
> Aborting
> >>> > incomplete transaction
> >>> (org.apache.kafka.clients.producer.KafkaProducer)
> >>> > [task-thread-json-sftp-source-connector-0]
> >>> >
> >>> > The issue with InvalidProducerEpoch is happening when I call the
> >>> commit if
> >>> > there is an invalid record, and in the next transaction I am getting
> >>> > InvalidProducerEpoch Exception and the messages are copied in the
> >>> previous
> >>> > email. I don't know if this will also be fixed by your bug fix.I am
> >>> using
> >>> > kafka 3.3.1 version as of now.
> >>> >
> >>> > Thanks,
> >>> > Nitty
> >>> >
> >>> >
> >>> > On Mon, Mar 13, 2023 at 10:47 AM NITTY BENNY 
> >>> wrote:
> >>> >
> >>> > > Hi Chris,
> >>> > >
> >>> > > The below mentioned issue is happe

Re: Exactly once kafka connect query

2023-03-14 Thread NITTY BENNY
Hi Chris,

Is there any possibility to have a call with you? This is actually blocking
our delivery, I actually want to sort with this.

Thanks,
Nitty

On Mon, Mar 13, 2023 at 8:18 PM NITTY BENNY  wrote:

> Hi Chris,
>
> I really don't understand why a graceful shutdown will happen during a
> commit operation? Am I understanding something wrong here?. I see
> this happens when I have a batch of 2 valid records and in the second
> batch the record is invalid. In that case I want to commit the valid
> records. So I called commit and sent an empty list for the current batch to
> poll() and then when the next file comes in and poll sees new records, I
> see InvalidProducerEpochException.
> Please advise me.
>
> Thanks,
> Nitty
>
> On Mon, Mar 13, 2023 at 5:33 PM NITTY BENNY  wrote:
>
>> Hi Chris,
>>
>> The difference is in the Task Classes, no difference for value/key
>> convertors.
>>
>> I don’t see log messages for graceful shutdown. I am not clear on what
>> you mean by shutting down the task.
>>
>> I called the commit operation for the successful records. Should I
>> perform any other steps if I have an invalid record?
>> Please advise.
>>
>> Thanks,
>> Nitty
>>
>> On Mon, Mar 13, 2023 at 3:42 PM Chris Egerton 
>> wrote:
>>
>>> Hi Nitty,
>>>
>>> Thanks again for all the details here, especially the log messages.
>>>
>>> > The below mentioned issue is happening for Json connector only. Is
>>> there
>>> any difference with asn1,binary,csv and json connector?
>>>
>>> Can you clarify if the difference here is in the Connector/Task classens,
>>> or if it's in the key/value converters that are configured for the
>>> connector? The key/value converters are configured using the
>>> "key.converter" and "value.converter" property and, if problems arise
>>> with
>>> them, the task will fail and, if it has a non-empty ongoing transaction,
>>> that transaction will be automatically aborted since we close the task's
>>> Kafka producer when it fails (or shuts down gracefully).
>>>
>>> With regards to these log messages:
>>>
>>> > org.apache.kafka.common.errors.ProducerFencedException: There is a
>>> newer
>>> producer with the same transactionalId which fences the current one.
>>>
>>> It looks like your tasks aren't shutting down gracefully in time, which
>>> causes them to be fenced out by the Connect framework later on. Do you
>>> see
>>> messages like "Graceful stop of task  failed" in the logs
>>> for
>>> your Connect worker?
>>>
>>> Cheers,
>>>
>>> Chris
>>>
>>> On Mon, Mar 13, 2023 at 10:58 AM NITTY BENNY 
>>> wrote:
>>>
>>> > Hi Chris,
>>> >
>>> > As you said, the below message is coming when I call an abort if there
>>> is
>>> > an invalid record, then for the next transaction I can see the below
>>> > message and then the connector will be stopped.
>>> > 2023-03-13 14:28:26,043 INFO [json-sftp-source-connector|task-0]
>>> Aborting
>>> > transaction for batch as requested by connector
>>> > (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask)
>>> > [task-thread-json-sftp-source-connector-0]
>>> > 2023-03-13 14:28:26,043 INFO [json-sftp-source-connector|task-0]
>>> [Producer
>>> > clientId=connector-producer-json-sftp-source-connector-0,
>>> > transactionalId=connect-cluster-json-sftp-source-connector-0] Aborting
>>> > incomplete transaction
>>> (org.apache.kafka.clients.producer.KafkaProducer)
>>> > [task-thread-json-sftp-source-connector-0]
>>> >
>>> > The issue with InvalidProducerEpoch is happening when I call the
>>> commit if
>>> > there is an invalid record, and in the next transaction I am getting
>>> > InvalidProducerEpoch Exception and the messages are copied in the
>>> previous
>>> > email. I don't know if this will also be fixed by your bug fix.I am
>>> using
>>> > kafka 3.3.1 version as of now.
>>> >
>>> > Thanks,
>>> > Nitty
>>> >
>>> >
>>> > On Mon, Mar 13, 2023 at 10:47 AM NITTY BENNY 
>>> wrote:
>>> >
>>> > > Hi Chris,
>>> > >
>>> > > The below mentioned issue is happening for Json connector only. Is
>>> there
>>> > > any difference with asn1,binary,csv and json connector?
>>> > >
>>> > > Thanks,
>>> > > Nitty
>>> > >
>>> > > On Mon, Mar 13, 2023 at 9:16 AM NITTY BENNY 
>>> > wrote:
>>> > >
>>> > >> Hi Chris,
>>> > >>
>>> > >> Sorry Chris, I am not able to reproduce the above issue.
>>> > >>
>>> > >> I want to share with you one more use case I found.
>>> > >> The use case is in the first batch it returns 2 valid records and
>>> then
>>> > in
>>> > >> the next batch it is an invalid record.Below is the
>>> transaction_state
>>> > >> topic, when I call a commit while processing an invalid record.
>>> > >>
>>> > >>
>>> >
>>> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
>>> > >> producerId=11, producerEpoch=2, txnTimeoutMs=6, state=*Ongoing*,
>>> > >> pendingState=None, topicPartitions=HashSet(streams-input-2),
>>> > >> txnStartTimestamp=1678620463834,
>>> txnLastUpdateTimestamp=1678620463834)
>>> > >>
>>

Re: Exactly once kafka connect query

2023-03-13 Thread NITTY BENNY
Hi Chris,

I really don't understand why a graceful shutdown will happen during a
commit operation? Am I understanding something wrong here?. I see
this happens when I have a batch of 2 valid records and in the second batch
the record is invalid. In that case I want to commit the valid records. So
I called commit and sent an empty list for the current batch to poll() and
then when the next file comes in and poll sees new records, I see
InvalidProducerEpochException.
Please advise me.

Thanks,
Nitty

On Mon, Mar 13, 2023 at 5:33 PM NITTY BENNY  wrote:

> Hi Chris,
>
> The difference is in the Task Classes, no difference for value/key
> convertors.
>
> I don’t see log messages for graceful shutdown. I am not clear on what you
> mean by shutting down the task.
>
> I called the commit operation for the successful records. Should I perform
> any other steps if I have an invalid record?
> Please advise.
>
> Thanks,
> Nitty
>
> On Mon, Mar 13, 2023 at 3:42 PM Chris Egerton 
> wrote:
>
>> Hi Nitty,
>>
>> Thanks again for all the details here, especially the log messages.
>>
>> > The below mentioned issue is happening for Json connector only. Is there
>> any difference with asn1,binary,csv and json connector?
>>
>> Can you clarify if the difference here is in the Connector/Task classens,
>> or if it's in the key/value converters that are configured for the
>> connector? The key/value converters are configured using the
>> "key.converter" and "value.converter" property and, if problems arise with
>> them, the task will fail and, if it has a non-empty ongoing transaction,
>> that transaction will be automatically aborted since we close the task's
>> Kafka producer when it fails (or shuts down gracefully).
>>
>> With regards to these log messages:
>>
>> > org.apache.kafka.common.errors.ProducerFencedException: There is a newer
>> producer with the same transactionalId which fences the current one.
>>
>> It looks like your tasks aren't shutting down gracefully in time, which
>> causes them to be fenced out by the Connect framework later on. Do you see
>> messages like "Graceful stop of task  failed" in the logs
>> for
>> your Connect worker?
>>
>> Cheers,
>>
>> Chris
>>
>> On Mon, Mar 13, 2023 at 10:58 AM NITTY BENNY 
>> wrote:
>>
>> > Hi Chris,
>> >
>> > As you said, the below message is coming when I call an abort if there
>> is
>> > an invalid record, then for the next transaction I can see the below
>> > message and then the connector will be stopped.
>> > 2023-03-13 14:28:26,043 INFO [json-sftp-source-connector|task-0]
>> Aborting
>> > transaction for batch as requested by connector
>> > (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask)
>> > [task-thread-json-sftp-source-connector-0]
>> > 2023-03-13 14:28:26,043 INFO [json-sftp-source-connector|task-0]
>> [Producer
>> > clientId=connector-producer-json-sftp-source-connector-0,
>> > transactionalId=connect-cluster-json-sftp-source-connector-0] Aborting
>> > incomplete transaction (org.apache.kafka.clients.producer.KafkaProducer)
>> > [task-thread-json-sftp-source-connector-0]
>> >
>> > The issue with InvalidProducerEpoch is happening when I call the commit
>> if
>> > there is an invalid record, and in the next transaction I am getting
>> > InvalidProducerEpoch Exception and the messages are copied in the
>> previous
>> > email. I don't know if this will also be fixed by your bug fix.I am
>> using
>> > kafka 3.3.1 version as of now.
>> >
>> > Thanks,
>> > Nitty
>> >
>> >
>> > On Mon, Mar 13, 2023 at 10:47 AM NITTY BENNY 
>> wrote:
>> >
>> > > Hi Chris,
>> > >
>> > > The below mentioned issue is happening for Json connector only. Is
>> there
>> > > any difference with asn1,binary,csv and json connector?
>> > >
>> > > Thanks,
>> > > Nitty
>> > >
>> > > On Mon, Mar 13, 2023 at 9:16 AM NITTY BENNY 
>> > wrote:
>> > >
>> > >> Hi Chris,
>> > >>
>> > >> Sorry Chris, I am not able to reproduce the above issue.
>> > >>
>> > >> I want to share with you one more use case I found.
>> > >> The use case is in the first batch it returns 2 valid records and
>> then
>> > in
>> > >> the next batch it is an invalid record.Below is the transaction_state
>> > >> topic, when I call a commit while processing an invalid record.
>> > >>
>> > >>
>> >
>> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
>> > >> producerId=11, producerEpoch=2, txnTimeoutMs=6, state=*Ongoing*,
>> > >> pendingState=None, topicPartitions=HashSet(streams-input-2),
>> > >> txnStartTimestamp=1678620463834,
>> txnLastUpdateTimestamp=1678620463834)
>> > >>
>> > >> then after some time I saw the below states as well,
>> > >>
>> > >>
>> >
>> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
>> > >> producerId=11, producerEpoch=3, txnTimeoutMs=6,
>> > state=*PrepareAbort*,
>> > >> pendingState=None, topicPartitions=HashSet(streams-input-2),
>> > >

Re: Exactly once kafka connect query

2023-03-13 Thread NITTY BENNY
Hi Chris,

The difference is in the Task Classes, no difference for value/key
convertors.

I don’t see log messages for graceful shutdown. I am not clear on what you
mean by shutting down the task.

I called the commit operation for the successful records. Should I perform
any other steps if I have an invalid record?
Please advise.

Thanks,
Nitty

On Mon, Mar 13, 2023 at 3:42 PM Chris Egerton 
wrote:

> Hi Nitty,
>
> Thanks again for all the details here, especially the log messages.
>
> > The below mentioned issue is happening for Json connector only. Is there
> any difference with asn1,binary,csv and json connector?
>
> Can you clarify if the difference here is in the Connector/Task classens,
> or if it's in the key/value converters that are configured for the
> connector? The key/value converters are configured using the
> "key.converter" and "value.converter" property and, if problems arise with
> them, the task will fail and, if it has a non-empty ongoing transaction,
> that transaction will be automatically aborted since we close the task's
> Kafka producer when it fails (or shuts down gracefully).
>
> With regards to these log messages:
>
> > org.apache.kafka.common.errors.ProducerFencedException: There is a newer
> producer with the same transactionalId which fences the current one.
>
> It looks like your tasks aren't shutting down gracefully in time, which
> causes them to be fenced out by the Connect framework later on. Do you see
> messages like "Graceful stop of task  failed" in the logs for
> your Connect worker?
>
> Cheers,
>
> Chris
>
> On Mon, Mar 13, 2023 at 10:58 AM NITTY BENNY  wrote:
>
> > Hi Chris,
> >
> > As you said, the below message is coming when I call an abort if there is
> > an invalid record, then for the next transaction I can see the below
> > message and then the connector will be stopped.
> > 2023-03-13 14:28:26,043 INFO [json-sftp-source-connector|task-0] Aborting
> > transaction for batch as requested by connector
> > (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask)
> > [task-thread-json-sftp-source-connector-0]
> > 2023-03-13 14:28:26,043 INFO [json-sftp-source-connector|task-0]
> [Producer
> > clientId=connector-producer-json-sftp-source-connector-0,
> > transactionalId=connect-cluster-json-sftp-source-connector-0] Aborting
> > incomplete transaction (org.apache.kafka.clients.producer.KafkaProducer)
> > [task-thread-json-sftp-source-connector-0]
> >
> > The issue with InvalidProducerEpoch is happening when I call the commit
> if
> > there is an invalid record, and in the next transaction I am getting
> > InvalidProducerEpoch Exception and the messages are copied in the
> previous
> > email. I don't know if this will also be fixed by your bug fix.I am using
> > kafka 3.3.1 version as of now.
> >
> > Thanks,
> > Nitty
> >
> >
> > On Mon, Mar 13, 2023 at 10:47 AM NITTY BENNY 
> wrote:
> >
> > > Hi Chris,
> > >
> > > The below mentioned issue is happening for Json connector only. Is
> there
> > > any difference with asn1,binary,csv and json connector?
> > >
> > > Thanks,
> > > Nitty
> > >
> > > On Mon, Mar 13, 2023 at 9:16 AM NITTY BENNY 
> > wrote:
> > >
> > >> Hi Chris,
> > >>
> > >> Sorry Chris, I am not able to reproduce the above issue.
> > >>
> > >> I want to share with you one more use case I found.
> > >> The use case is in the first batch it returns 2 valid records and then
> > in
> > >> the next batch it is an invalid record.Below is the transaction_state
> > >> topic, when I call a commit while processing an invalid record.
> > >>
> > >>
> >
> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
> > >> producerId=11, producerEpoch=2, txnTimeoutMs=6, state=*Ongoing*,
> > >> pendingState=None, topicPartitions=HashSet(streams-input-2),
> > >> txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620463834)
> > >>
> > >> then after some time I saw the below states as well,
> > >>
> > >>
> >
> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
> > >> producerId=11, producerEpoch=3, txnTimeoutMs=6,
> > state=*PrepareAbort*,
> > >> pendingState=None, topicPartitions=HashSet(streams-input-2),
> > >> txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620526119)
> > >>
> >
> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
> > >> producerId=11, producerEpoch=3, txnTimeoutMs=6,
> > state=*CompleteAbort*,
> > >> pendingState=None, topicPartitions=HashSet(),
> > >> txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620526121)
> > >>
> > >> Later for the next transaction, when it returns the first batch below
> is
> > >> the logs I can see.
> > >>
> > >>  Transiting to abortable error state due to
> > >> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer
> > >> attempted to produc

Re: Exactly once kafka connect query

2023-03-13 Thread Chris Egerton
Hi Nitty,

Thanks again for all the details here, especially the log messages.

> The below mentioned issue is happening for Json connector only. Is there
any difference with asn1,binary,csv and json connector?

Can you clarify if the difference here is in the Connector/Task classens,
or if it's in the key/value converters that are configured for the
connector? The key/value converters are configured using the
"key.converter" and "value.converter" property and, if problems arise with
them, the task will fail and, if it has a non-empty ongoing transaction,
that transaction will be automatically aborted since we close the task's
Kafka producer when it fails (or shuts down gracefully).

With regards to these log messages:

> org.apache.kafka.common.errors.ProducerFencedException: There is a newer
producer with the same transactionalId which fences the current one.

It looks like your tasks aren't shutting down gracefully in time, which
causes them to be fenced out by the Connect framework later on. Do you see
messages like "Graceful stop of task  failed" in the logs for
your Connect worker?

Cheers,

Chris

On Mon, Mar 13, 2023 at 10:58 AM NITTY BENNY  wrote:

> Hi Chris,
>
> As you said, the below message is coming when I call an abort if there is
> an invalid record, then for the next transaction I can see the below
> message and then the connector will be stopped.
> 2023-03-13 14:28:26,043 INFO [json-sftp-source-connector|task-0] Aborting
> transaction for batch as requested by connector
> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask)
> [task-thread-json-sftp-source-connector-0]
> 2023-03-13 14:28:26,043 INFO [json-sftp-source-connector|task-0] [Producer
> clientId=connector-producer-json-sftp-source-connector-0,
> transactionalId=connect-cluster-json-sftp-source-connector-0] Aborting
> incomplete transaction (org.apache.kafka.clients.producer.KafkaProducer)
> [task-thread-json-sftp-source-connector-0]
>
> The issue with InvalidProducerEpoch is happening when I call the commit if
> there is an invalid record, and in the next transaction I am getting
> InvalidProducerEpoch Exception and the messages are copied in the previous
> email. I don't know if this will also be fixed by your bug fix.I am using
> kafka 3.3.1 version as of now.
>
> Thanks,
> Nitty
>
>
> On Mon, Mar 13, 2023 at 10:47 AM NITTY BENNY  wrote:
>
> > Hi Chris,
> >
> > The below mentioned issue is happening for Json connector only. Is there
> > any difference with asn1,binary,csv and json connector?
> >
> > Thanks,
> > Nitty
> >
> > On Mon, Mar 13, 2023 at 9:16 AM NITTY BENNY 
> wrote:
> >
> >> Hi Chris,
> >>
> >> Sorry Chris, I am not able to reproduce the above issue.
> >>
> >> I want to share with you one more use case I found.
> >> The use case is in the first batch it returns 2 valid records and then
> in
> >> the next batch it is an invalid record.Below is the transaction_state
> >> topic, when I call a commit while processing an invalid record.
> >>
> >>
> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
> >> producerId=11, producerEpoch=2, txnTimeoutMs=6, state=*Ongoing*,
> >> pendingState=None, topicPartitions=HashSet(streams-input-2),
> >> txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620463834)
> >>
> >> then after some time I saw the below states as well,
> >>
> >>
> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
> >> producerId=11, producerEpoch=3, txnTimeoutMs=6,
> state=*PrepareAbort*,
> >> pendingState=None, topicPartitions=HashSet(streams-input-2),
> >> txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620526119)
> >>
> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
> >> producerId=11, producerEpoch=3, txnTimeoutMs=6,
> state=*CompleteAbort*,
> >> pendingState=None, topicPartitions=HashSet(),
> >> txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620526121)
> >>
> >> Later for the next transaction, when it returns the first batch below is
> >> the logs I can see.
> >>
> >>  Transiting to abortable error state due to
> >> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer
> >> attempted to produce with an old epoch.
> >> (org.apache.kafka.clients.producer.internals.TransactionManager)
> >> [kafka-producer-network-thread |
> >> connector-producer-json-sftp-source-connector-0]
> >> 2023-03-12 11:32:45,220 ERROR [json-sftp-source-connector|task-0]
> >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} failed to
> send
> >> record to streams-input:
> >> (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask)
> >> [kafka-producer-network-thread |
> >> connector-producer-json-sftp-source-connector-0]
> >> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer
> >> attempted to produce

Re: Exactly once kafka connect query

2023-03-13 Thread NITTY BENNY
Hi Chris,

As you said, the below message is coming when I call an abort if there is
an invalid record, then for the next transaction I can see the below
message and then the connector will be stopped.
2023-03-13 14:28:26,043 INFO [json-sftp-source-connector|task-0] Aborting
transaction for batch as requested by connector
(org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask)
[task-thread-json-sftp-source-connector-0]
2023-03-13 14:28:26,043 INFO [json-sftp-source-connector|task-0] [Producer
clientId=connector-producer-json-sftp-source-connector-0,
transactionalId=connect-cluster-json-sftp-source-connector-0] Aborting
incomplete transaction (org.apache.kafka.clients.producer.KafkaProducer)
[task-thread-json-sftp-source-connector-0]

The issue with InvalidProducerEpoch is happening when I call the commit if
there is an invalid record, and in the next transaction I am getting
InvalidProducerEpoch Exception and the messages are copied in the previous
email. I don't know if this will also be fixed by your bug fix.I am using
kafka 3.3.1 version as of now.

Thanks,
Nitty


On Mon, Mar 13, 2023 at 10:47 AM NITTY BENNY  wrote:

> Hi Chris,
>
> The below mentioned issue is happening for Json connector only. Is there
> any difference with asn1,binary,csv and json connector?
>
> Thanks,
> Nitty
>
> On Mon, Mar 13, 2023 at 9:16 AM NITTY BENNY  wrote:
>
>> Hi Chris,
>>
>> Sorry Chris, I am not able to reproduce the above issue.
>>
>> I want to share with you one more use case I found.
>> The use case is in the first batch it returns 2 valid records and then in
>> the next batch it is an invalid record.Below is the transaction_state
>> topic, when I call a commit while processing an invalid record.
>>
>> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
>> producerId=11, producerEpoch=2, txnTimeoutMs=6, state=*Ongoing*,
>> pendingState=None, topicPartitions=HashSet(streams-input-2),
>> txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620463834)
>>
>> then after some time I saw the below states as well,
>>
>> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
>> producerId=11, producerEpoch=3, txnTimeoutMs=6, state=*PrepareAbort*,
>> pendingState=None, topicPartitions=HashSet(streams-input-2),
>> txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620526119)
>> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
>> producerId=11, producerEpoch=3, txnTimeoutMs=6, state=*CompleteAbort*,
>> pendingState=None, topicPartitions=HashSet(),
>> txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620526121)
>>
>> Later for the next transaction, when it returns the first batch below is
>> the logs I can see.
>>
>>  Transiting to abortable error state due to
>> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer
>> attempted to produce with an old epoch.
>> (org.apache.kafka.clients.producer.internals.TransactionManager)
>> [kafka-producer-network-thread |
>> connector-producer-json-sftp-source-connector-0]
>> 2023-03-12 11:32:45,220 ERROR [json-sftp-source-connector|task-0]
>> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} failed to send
>> record to streams-input:
>> (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask)
>> [kafka-producer-network-thread |
>> connector-producer-json-sftp-source-connector-0]
>> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer
>> attempted to produce with an old epoch.
>> 2023-03-12 11:32:45,222 INFO [json-sftp-source-connector|task-0]
>> [Producer clientId=connector-producer-json-sftp-source-connector-0,
>> transactionalId=connect-cluster-json-sftp-source-connector-0] Transiting to
>> fatal error state due to
>> org.apache.kafka.common.errors.ProducerFencedException: There is a newer
>> producer with the same transactionalId which fences the current one.
>> (org.apache.kafka.clients.producer.internals.TransactionManager)
>> [kafka-producer-network-thread |
>> connector-producer-json-sftp-source-connector-0]
>> 2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0]
>> [Producer clientId=connector-producer-json-sftp-source-connector-0,
>> transactionalId=connect-cluster-json-sftp-source-connector-0] Aborting
>> producer batches due to fatal error
>> (org.apache.kafka.clients.producer.internals.Sender)
>> [kafka-producer-network-thread |
>> connector-producer-json-sftp-source-connector-0]
>> org.apache.kafka.common.errors.ProducerFencedException: There is a newer
>> producer with the same transactionalId which fences the current one.
>> 2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0]
>> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Failed to
>> flush offsets to storage:
>> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSou

Re: Exactly once kafka connect query

2023-03-13 Thread NITTY BENNY
Hi Chris,

The below mentioned issue is happening for Json connector only. Is there
any difference with asn1,binary,csv and json connector?

Thanks,
Nitty

On Mon, Mar 13, 2023 at 9:16 AM NITTY BENNY  wrote:

> Hi Chris,
>
> Sorry Chris, I am not able to reproduce the above issue.
>
> I want to share with you one more use case I found.
> The use case is in the first batch it returns 2 valid records and then in
> the next batch it is an invalid record.Below is the transaction_state
> topic, when I call a commit while processing an invalid record.
>
> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
> producerId=11, producerEpoch=2, txnTimeoutMs=6, state=*Ongoing*,
> pendingState=None, topicPartitions=HashSet(streams-input-2),
> txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620463834)
>
> then after some time I saw the below states as well,
>
> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
> producerId=11, producerEpoch=3, txnTimeoutMs=6, state=*PrepareAbort*,
> pendingState=None, topicPartitions=HashSet(streams-input-2),
> txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620526119)
> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
> producerId=11, producerEpoch=3, txnTimeoutMs=6, state=*CompleteAbort*,
> pendingState=None, topicPartitions=HashSet(),
> txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620526121)
>
> Later for the next transaction, when it returns the first batch below is
> the logs I can see.
>
>  Transiting to abortable error state due to
> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer
> attempted to produce with an old epoch.
> (org.apache.kafka.clients.producer.internals.TransactionManager)
> [kafka-producer-network-thread |
> connector-producer-json-sftp-source-connector-0]
> 2023-03-12 11:32:45,220 ERROR [json-sftp-source-connector|task-0]
> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} failed to send
> record to streams-input:
> (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask)
> [kafka-producer-network-thread |
> connector-producer-json-sftp-source-connector-0]
> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer
> attempted to produce with an old epoch.
> 2023-03-12 11:32:45,222 INFO [json-sftp-source-connector|task-0] [Producer
> clientId=connector-producer-json-sftp-source-connector-0,
> transactionalId=connect-cluster-json-sftp-source-connector-0] Transiting to
> fatal error state due to
> org.apache.kafka.common.errors.ProducerFencedException: There is a newer
> producer with the same transactionalId which fences the current one.
> (org.apache.kafka.clients.producer.internals.TransactionManager)
> [kafka-producer-network-thread |
> connector-producer-json-sftp-source-connector-0]
> 2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0]
> [Producer clientId=connector-producer-json-sftp-source-connector-0,
> transactionalId=connect-cluster-json-sftp-source-connector-0] Aborting
> producer batches due to fatal error
> (org.apache.kafka.clients.producer.internals.Sender)
> [kafka-producer-network-thread |
> connector-producer-json-sftp-source-connector-0]
> org.apache.kafka.common.errors.ProducerFencedException: There is a newer
> producer with the same transactionalId which fences the current one.
> 2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0]
> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Failed to
> flush offsets to storage:
> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask)
> [kafka-producer-network-thread |
> connector-producer-json-sftp-source-connector-0]
> org.apache.kafka.common.errors.ProducerFencedException: There is a newer
> producer with the same transactionalId which fences the current one.
> 2023-03-12 11:32:45,224 ERROR [json-sftp-source-connector|task-0]
> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} failed to send
> record to streams-input:
> (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask)
> [kafka-producer-network-thread |
> connector-producer-json-sftp-source-connector-0]
> org.apache.kafka.common.errors.ProducerFencedException: There is a newer
> producer with the same transactionalId which fences the current one.
> 2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0|offsets]
> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Failed to
> commit producer transaction
> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask)
> [task-thread-json-sftp-source-connector-0]
> org.apache.kafka.common.errors.ProducerFencedException: There is a newer
> producer with the same transactionalId which fences the current one.
> 2023-03-12 11:32:45,225 ERROR [json-sftp-source-connector|task-0]
> ExactlyOnceWo

Re: Exactly once kafka connect query

2023-03-13 Thread NITTY BENNY
Hi Chris,

Sorry Chris, I am not able to reproduce the above issue.

I want to share with you one more use case I found.
The use case is in the first batch it returns 2 valid records and then in
the next batch it is an invalid record.Below is the transaction_state
topic, when I call a commit while processing an invalid record.

connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
producerId=11, producerEpoch=2, txnTimeoutMs=6, state=*Ongoing*,
pendingState=None, topicPartitions=HashSet(streams-input-2),
txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620463834)

then after some time I saw the below states as well,

connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
producerId=11, producerEpoch=3, txnTimeoutMs=6, state=*PrepareAbort*,
pendingState=None, topicPartitions=HashSet(streams-input-2),
txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620526119)
connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
producerId=11, producerEpoch=3, txnTimeoutMs=6, state=*CompleteAbort*,
pendingState=None, topicPartitions=HashSet(),
txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620526121)

Later for the next transaction, when it returns the first batch below is
the logs I can see.

 Transiting to abortable error state due to
org.apache.kafka.common.errors.InvalidProducerEpochException: Producer
attempted to produce with an old epoch.
(org.apache.kafka.clients.producer.internals.TransactionManager)
[kafka-producer-network-thread |
connector-producer-json-sftp-source-connector-0]
2023-03-12 11:32:45,220 ERROR [json-sftp-source-connector|task-0]
ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} failed to send
record to streams-input:
(org.apache.kafka.connect.runtime.AbstractWorkerSourceTask)
[kafka-producer-network-thread |
connector-producer-json-sftp-source-connector-0]
org.apache.kafka.common.errors.InvalidProducerEpochException: Producer
attempted to produce with an old epoch.
2023-03-12 11:32:45,222 INFO [json-sftp-source-connector|task-0] [Producer
clientId=connector-producer-json-sftp-source-connector-0,
transactionalId=connect-cluster-json-sftp-source-connector-0] Transiting to
fatal error state due to
org.apache.kafka.common.errors.ProducerFencedException: There is a newer
producer with the same transactionalId which fences the current one.
(org.apache.kafka.clients.producer.internals.TransactionManager)
[kafka-producer-network-thread |
connector-producer-json-sftp-source-connector-0]
2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0] [Producer
clientId=connector-producer-json-sftp-source-connector-0,
transactionalId=connect-cluster-json-sftp-source-connector-0] Aborting
producer batches due to fatal error
(org.apache.kafka.clients.producer.internals.Sender)
[kafka-producer-network-thread |
connector-producer-json-sftp-source-connector-0]
org.apache.kafka.common.errors.ProducerFencedException: There is a newer
producer with the same transactionalId which fences the current one.
2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0]
ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Failed to
flush offsets to storage:
(org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask)
[kafka-producer-network-thread |
connector-producer-json-sftp-source-connector-0]
org.apache.kafka.common.errors.ProducerFencedException: There is a newer
producer with the same transactionalId which fences the current one.
2023-03-12 11:32:45,224 ERROR [json-sftp-source-connector|task-0]
ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} failed to send
record to streams-input:
(org.apache.kafka.connect.runtime.AbstractWorkerSourceTask)
[kafka-producer-network-thread |
connector-producer-json-sftp-source-connector-0]
org.apache.kafka.common.errors.ProducerFencedException: There is a newer
producer with the same transactionalId which fences the current one.
2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0|offsets]
ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Failed to
commit producer transaction
(org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask)
[task-thread-json-sftp-source-connector-0]
org.apache.kafka.common.errors.ProducerFencedException: There is a newer
producer with the same transactionalId which fences the current one.
2023-03-12 11:32:45,225 ERROR [json-sftp-source-connector|task-0]
ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Task threw an
uncaught and unrecoverable exception. Task is being killed and will not
recover until manually restarted
(org.apache.kafka.connect.runtime.WorkerTask)
[task-thread-json-sftp-source-connector-0]

Do you know why it is showing an abort state even if I call commit?

I tested one more scenario, When I call the co

Re: Exactly once kafka connect query

2023-03-10 Thread Chris Egerton
Hi Nitty,

> I called commitTransaction when I reach the first error record, but
commit is not happening for me. Kafka connect tries to abort the
transaction automatically

This is really interesting--are you certain that your task never invoked
TransactionContext::abortTransaction in this case? I'm looking over the
code base and it seems fairly clear that the only thing that could trigger
a call to KafkaProducer::abortTransaction is a request by the task to abort
a transaction (either for a next batch, or for a specific record). It may
help to run the connector in a debugger and/or look for "Aborting
transaction for batch as requested by connector" or "Aborting transaction
for record on topic  as requested by connector" log
messages (which will be emitted at INFO level by
the org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask class if
the task is requesting an abort).

Regardless, I'll work on a fix for the bug with aborting empty
transactions. Thanks for helping uncover that one!

Cheers,

Chris

On Thu, Mar 9, 2023 at 6:36 PM NITTY BENNY  wrote:

> Hi Chris,
>
> We have a use case to commit previous successful records and stop the
> processing of the current file and move on with the next file. To achieve
> that I called commitTransaction when I reach the first error record, but
> commit is not happening for me. Kafka connect tries to abort the
> transaction automatically, I checked the _transaction_state topic and
> states marked as PrepareAbort and CompleteAbort. Do you know why kafka
> connect automatically invokes abort instead of the implicit commit I
> called?
> Then as a result, when I tries to parse the next file - say ABC, I saw the
> logs "Aborting incomplete transaction" and ERROR: "Failed to sent record to
> topic", and we lost the first batch of records from the current transaction
> in the file ABC.
>
> Is it possible that there's a case where an abort is being requested while
> the current transaction is empty (i.e., the task hasn't returned any
> records from SourceTask::poll since the last transaction was
> committed/aborted)? --- Yes, that case is possible for us. There is a case
> where the first record itself an error record.
>
> Thanks,
> Nitty
>
> On Thu, Mar 9, 2023 at 3:48 PM Chris Egerton 
> wrote:
>
> > Hi Nitty,
> >
> > Thanks for the code examples and the detailed explanations, this is
> really
> > helpful!
> >
> > > Say if I have a file with 5 records and batch size is 2, and in my 3rd
> > batch I have one error record then in that batch, I dont have a valid
> > record to call commit or abort. But I want to commit all the previous
> > batches that were successfully parsed. How do I do that?
> >
> > An important thing to keep in mind with the TransactionContext API is
> that
> > all records that a task returns from SourceTask::poll are implicitly
> > included in a transaction. Invoking SourceTaskContext::transactionContext
> > doesn't alter this or cause transactions to start being used; everything
> is
> > already in a transaction, and the Connect runtime automatically begins
> > transactions for any records it sees from the task if it hasn't already
> > begun one. It's also valid to return a null or empty list of records from
> > SourceTask::poll. So in this case, you can invoke
> > transactionContext.commitTransaction() (the no-args variant) and return
> an
> > empty batch from SourceTask::poll, which will cause the transaction
> > containing the 4 valid records that were returned in the last 2 batches
> to
> > be committed.
> >
> > FWIW, I would be a little cautious about this approach. Many times it's
> > better to fail fast on invalid data; it might be worth it to at least
> allow
> > users to configure whether the connector fails on invalid data, or
> silently
> > skips over it (which is what happens when transactions are aborted).
> >
> > > Why is abort not working without adding the last record to the list?
> >
> > Is it possible that there's a case where an abort is being requested
> while
> > the current transaction is empty (i.e., the task hasn't returned any
> > records from SourceTask::poll since the last transaction was
> > committed/aborted)? I think this may be a bug in the Connect framework
> > where we don't check to see if a transaction is already open when a task
> > requests that a transaction be aborted, which can cause tasks to fail
> (see
> > https://issues.apache.org/jira/browse/KAFKA-14799 for more details).
> >
> > Cheers,
> >
> > Chris
> >
> >
> > On Wed, Mar 8, 2023 at 6:44 PM NITTY BENNY  wrote:
> >
> > > Hi Chris,
> > >
> > > I am not sure if you are able to see the images I shared with you .
> > > Copying the code snippet below,
> > >
> > >  if (expectedRecordCount >= 0) {
> > > int missingCount = expectedRecordCount - (int) this.
> > > recordOffset() - 1;
> > > if (missingCount > 0) {
> > >   if (transactionContext != null) {
> > > isMissedRecords = true;
> > >   } else

Re: Exactly once kafka connect query

2023-03-09 Thread NITTY BENNY
Hi Chris,

We have a use case to commit previous successful records and stop the
processing of the current file and move on with the next file. To achieve
that I called commitTransaction when I reach the first error record, but
commit is not happening for me. Kafka connect tries to abort the
transaction automatically, I checked the _transaction_state topic and
states marked as PrepareAbort and CompleteAbort. Do you know why kafka
connect automatically invokes abort instead of the implicit commit I called?
Then as a result, when I tries to parse the next file - say ABC, I saw the
logs "Aborting incomplete transaction" and ERROR: "Failed to sent record to
topic", and we lost the first batch of records from the current transaction
in the file ABC.

Is it possible that there's a case where an abort is being requested while
the current transaction is empty (i.e., the task hasn't returned any
records from SourceTask::poll since the last transaction was
committed/aborted)? --- Yes, that case is possible for us. There is a case
where the first record itself an error record.

Thanks,
Nitty

On Thu, Mar 9, 2023 at 3:48 PM Chris Egerton 
wrote:

> Hi Nitty,
>
> Thanks for the code examples and the detailed explanations, this is really
> helpful!
>
> > Say if I have a file with 5 records and batch size is 2, and in my 3rd
> batch I have one error record then in that batch, I dont have a valid
> record to call commit or abort. But I want to commit all the previous
> batches that were successfully parsed. How do I do that?
>
> An important thing to keep in mind with the TransactionContext API is that
> all records that a task returns from SourceTask::poll are implicitly
> included in a transaction. Invoking SourceTaskContext::transactionContext
> doesn't alter this or cause transactions to start being used; everything is
> already in a transaction, and the Connect runtime automatically begins
> transactions for any records it sees from the task if it hasn't already
> begun one. It's also valid to return a null or empty list of records from
> SourceTask::poll. So in this case, you can invoke
> transactionContext.commitTransaction() (the no-args variant) and return an
> empty batch from SourceTask::poll, which will cause the transaction
> containing the 4 valid records that were returned in the last 2 batches to
> be committed.
>
> FWIW, I would be a little cautious about this approach. Many times it's
> better to fail fast on invalid data; it might be worth it to at least allow
> users to configure whether the connector fails on invalid data, or silently
> skips over it (which is what happens when transactions are aborted).
>
> > Why is abort not working without adding the last record to the list?
>
> Is it possible that there's a case where an abort is being requested while
> the current transaction is empty (i.e., the task hasn't returned any
> records from SourceTask::poll since the last transaction was
> committed/aborted)? I think this may be a bug in the Connect framework
> where we don't check to see if a transaction is already open when a task
> requests that a transaction be aborted, which can cause tasks to fail (see
> https://issues.apache.org/jira/browse/KAFKA-14799 for more details).
>
> Cheers,
>
> Chris
>
>
> On Wed, Mar 8, 2023 at 6:44 PM NITTY BENNY  wrote:
>
> > Hi Chris,
> >
> > I am not sure if you are able to see the images I shared with you .
> > Copying the code snippet below,
> >
> >  if (expectedRecordCount >= 0) {
> > int missingCount = expectedRecordCount - (int) this.
> > recordOffset() - 1;
> > if (missingCount > 0) {
> >   if (transactionContext != null) {
> > isMissedRecords = true;
> >   } else {
> > throw new DataException(String.format("Missing %d records
> > (expecting %d, actual %d)", missingCount, expectedRecordCount, this.
> > recordOffset()));
> >   }
> > } else if (missingCount < 0) {
> >   if (transactionContext != null) {
> > isMissedRecords = true;
> >   } else {
> > throw new DataException(String.format("Too many records
> > (expecting %d, actual %d)", expectedRecordCount, this.recordOffset()));
> >   }
> > }
> >   }
> >   addLastRecord(records, null, value);
> > }
> >
> >
> >
> >  //asn1 or binary abort
> > if((config.parseErrorThreshold != null && parseErrorCount >=
> > config.parseErrorThreshold
> > && lastbatch && transactionContext != null) || (isMissedRecords
> > && transactionContext != null && lastbatch)) {
> >   log.info("Transaction is aborting");
> > log.info("records = {}", records);
> > if (!records.isEmpty()) {
> >   log.info("with record");
> >
>  transactionContext.abortTransaction(records.get(records.size
> > ()-1));
> > } else {
> >   log.info("without record");
> >

Re: Exactly once kafka connect query

2023-03-09 Thread Chris Egerton
Hi Nitty,

Thanks for the code examples and the detailed explanations, this is really
helpful!

> Say if I have a file with 5 records and batch size is 2, and in my 3rd
batch I have one error record then in that batch, I dont have a valid
record to call commit or abort. But I want to commit all the previous
batches that were successfully parsed. How do I do that?

An important thing to keep in mind with the TransactionContext API is that
all records that a task returns from SourceTask::poll are implicitly
included in a transaction. Invoking SourceTaskContext::transactionContext
doesn't alter this or cause transactions to start being used; everything is
already in a transaction, and the Connect runtime automatically begins
transactions for any records it sees from the task if it hasn't already
begun one. It's also valid to return a null or empty list of records from
SourceTask::poll. So in this case, you can invoke
transactionContext.commitTransaction() (the no-args variant) and return an
empty batch from SourceTask::poll, which will cause the transaction
containing the 4 valid records that were returned in the last 2 batches to
be committed.

FWIW, I would be a little cautious about this approach. Many times it's
better to fail fast on invalid data; it might be worth it to at least allow
users to configure whether the connector fails on invalid data, or silently
skips over it (which is what happens when transactions are aborted).

> Why is abort not working without adding the last record to the list?

Is it possible that there's a case where an abort is being requested while
the current transaction is empty (i.e., the task hasn't returned any
records from SourceTask::poll since the last transaction was
committed/aborted)? I think this may be a bug in the Connect framework
where we don't check to see if a transaction is already open when a task
requests that a transaction be aborted, which can cause tasks to fail (see
https://issues.apache.org/jira/browse/KAFKA-14799 for more details).

Cheers,

Chris


On Wed, Mar 8, 2023 at 6:44 PM NITTY BENNY  wrote:

> Hi Chris,
>
> I am not sure if you are able to see the images I shared with you .
> Copying the code snippet below,
>
>  if (expectedRecordCount >= 0) {
> int missingCount = expectedRecordCount - (int) this.
> recordOffset() - 1;
> if (missingCount > 0) {
>   if (transactionContext != null) {
> isMissedRecords = true;
>   } else {
> throw new DataException(String.format("Missing %d records
> (expecting %d, actual %d)", missingCount, expectedRecordCount, this.
> recordOffset()));
>   }
> } else if (missingCount < 0) {
>   if (transactionContext != null) {
> isMissedRecords = true;
>   } else {
> throw new DataException(String.format("Too many records
> (expecting %d, actual %d)", expectedRecordCount, this.recordOffset()));
>   }
> }
>   }
>   addLastRecord(records, null, value);
> }
>
>
>
>  //asn1 or binary abort
> if((config.parseErrorThreshold != null && parseErrorCount >=
> config.parseErrorThreshold
> && lastbatch && transactionContext != null) || (isMissedRecords
> && transactionContext != null && lastbatch)) {
>   log.info("Transaction is aborting");
> log.info("records = {}", records);
> if (!records.isEmpty()) {
>   log.info("with record");
>   transactionContext.abortTransaction(records.get(records.size
> ()-1));
> } else {
>   log.info("without record");
>   transactionContext.abortTransaction();
> }
>
> Thanks,
> Nitty
>
> On Wed, Mar 8, 2023 at 11:38 PM NITTY BENNY  wrote:
>
>> Hi Chris,
>> Sorry for the typo in my previous email.
>>
>> Regarding the point 2,* the task returns a batch of records from
>> SourceTask::poll (and, if using*
>>
>>
>> *the per-record API provided by the TransactionContext class, includes
>> atleast one record that should trigger a transaction commit/abort in
>> thatbatch)*
>> What if I am using the API without passing a record? We have 2 types of
>> use cases, one where on encountering an error record, we want to commit
>> previous successful batches and disregard the failed record and upcoming
>> batches. In this case we created the transactionContext just before reading
>> the file (file is our transaction boundary).Say if I have a file with 5
>> records and batch size is 2, and in my 3rd batch I have one error record
>> then in that batch, I dont have a valid record to call commit or abort. But
>> I want to commit all the previous batches that were successfully parsed.
>> How do I do that?
>>
>> Second use case is where I want to abort a transaction if the record
>> count doesn't match.
>> Code Snippet :
>> [image: image.png]
>> There are no error records in this case. If you see I added the 

Re: Exactly once kafka connect query

2023-03-08 Thread NITTY BENNY
Hi Chris,

I am not sure if you are able to see the images I shared with you .
Copying the code snippet below,

 if (expectedRecordCount >= 0) {
int missingCount = expectedRecordCount - (int) this.recordOffset()
- 1;
if (missingCount > 0) {
  if (transactionContext != null) {
isMissedRecords = true;
  } else {
throw new DataException(String.format("Missing %d records
(expecting %d, actual %d)", missingCount, expectedRecordCount, this.
recordOffset()));
  }
} else if (missingCount < 0) {
  if (transactionContext != null) {
isMissedRecords = true;
  } else {
throw new DataException(String.format("Too many records
(expecting %d, actual %d)", expectedRecordCount, this.recordOffset()));
  }
}
  }
  addLastRecord(records, null, value);
}



 //asn1 or binary abort
if((config.parseErrorThreshold != null && parseErrorCount >= config.
parseErrorThreshold
&& lastbatch && transactionContext != null) || (isMissedRecords &&
transactionContext != null && lastbatch)) {
  log.info("Transaction is aborting");
log.info("records = {}", records);
if (!records.isEmpty()) {
  log.info("with record");
  transactionContext.abortTransaction(records.get(records.size
()-1));
} else {
  log.info("without record");
  transactionContext.abortTransaction();
}

Thanks,
Nitty

On Wed, Mar 8, 2023 at 11:38 PM NITTY BENNY  wrote:

> Hi Chris,
> Sorry for the typo in my previous email.
>
> Regarding the point 2,* the task returns a batch of records from
> SourceTask::poll (and, if using*
>
>
> *the per-record API provided by the TransactionContext class, includes
> atleast one record that should trigger a transaction commit/abort in
> thatbatch)*
> What if I am using the API without passing a record? We have 2 types of
> use cases, one where on encountering an error record, we want to commit
> previous successful batches and disregard the failed record and upcoming
> batches. In this case we created the transactionContext just before reading
> the file (file is our transaction boundary).Say if I have a file with 5
> records and batch size is 2, and in my 3rd batch I have one error record
> then in that batch, I dont have a valid record to call commit or abort. But
> I want to commit all the previous batches that were successfully parsed.
> How do I do that?
>
> Second use case is where I want to abort a transaction if the record count
> doesn't match.
> Code Snippet :
> [image: image.png]
> There are no error records in this case. If you see I added the condition
> of transactionContext check to implement exactly once, without
> transaction it was just throwing the exception without calling the
> addLastRecord() method and in the catch block it just logs the message and
> return the list of records without the last record to poll().To make it
> work, I called the method addLastRecord() in this case, so it is not
> throwing the exception and list has last record as well. Then I called the
> abort, everything got aborted. Why is abort not working without adding the
> last record to the list?
> [image: image.png]
>
> Code to call abort.
>
>
>
>
> Thanks,
> Nitty
>
> On Wed, Mar 8, 2023 at 4:26 PM Chris Egerton 
> wrote:
>
>> Hi Nitty,
>>
>> I'm a little confused about what you mean by this part:
>>
>> > transaction is not getting completed because it is not commiting the
>> transaction offest.
>>
>> The only conditions required for a transaction to be completed when a
>> connector is defining its own transaction boundaries are:
>>
>> 1. The task requests a transaction commit/abort from the
>> TransactionContext
>> 2. The task returns a batch of records from SourceTask::poll (and, if
>> using
>> the per-record API provided by the TransactionContext class, includes at
>> least one record that should trigger a transaction commit/abort in that
>> batch)
>>
>> The Connect runtime should automatically commit source offsets to Kafka
>> whenever a transaction is completed, either by commit or abort. This is
>> because transactions should only be aborted for data that should never be
>> re-read by the connector; if there is a validation error that should be
>> handled by reconfiguring the connector, then the task should throw an
>> exception instead of aborting the transaction.
>>
>> If possible, do you think you could provide a brief code snippet
>> illustrating what your task is doing that's causing issues?
>>
>> Cheers,
>>
>> Chris (not Chrise 🙂)
>>
>> On Tue, Mar 7, 2023 at 10:17 AM NITTY BENNY  wrote:
>>
>> > Hi Chrise,
>> >
>> > Thanks for sharing the details.
>> >
>> > Regarding the use case, For Asn1 source connector we have a use case to
>> > validate number of records in the file with the number of records in the
>> > hea

Re: Exactly once kafka connect query

2023-03-08 Thread NITTY BENNY
Hi Chris,
Sorry for the typo in my previous email.

Regarding the point 2,* the task returns a batch of records from
SourceTask::poll (and, if using*


*the per-record API provided by the TransactionContext class, includes
atleast one record that should trigger a transaction commit/abort in
thatbatch)*
What if I am using the API without passing a record? We have 2 types of use
cases, one where on encountering an error record, we want to commit
previous successful batches and disregard the failed record and upcoming
batches. In this case we created the transactionContext just before reading
the file (file is our transaction boundary).Say if I have a file with 5
records and batch size is 2, and in my 3rd batch I have one error record
then in that batch, I dont have a valid record to call commit or abort. But
I want to commit all the previous batches that were successfully parsed.
How do I do that?

Second use case is where I want to abort a transaction if the record count
doesn't match.
Code Snippet :
[image: image.png]
There are no error records in this case. If you see I added the condition
of transactionContext check to implement exactly once, without
transaction it was just throwing the exception without calling the
addLastRecord() method and in the catch block it just logs the message and
return the list of records without the last record to poll().To make it
work, I called the method addLastRecord() in this case, so it is not
throwing the exception and list has last record as well. Then I called the
abort, everything got aborted. Why is abort not working without adding the
last record to the list?
[image: image.png]

Code to call abort.




Thanks,
Nitty

On Wed, Mar 8, 2023 at 4:26 PM Chris Egerton 
wrote:

> Hi Nitty,
>
> I'm a little confused about what you mean by this part:
>
> > transaction is not getting completed because it is not commiting the
> transaction offest.
>
> The only conditions required for a transaction to be completed when a
> connector is defining its own transaction boundaries are:
>
> 1. The task requests a transaction commit/abort from the TransactionContext
> 2. The task returns a batch of records from SourceTask::poll (and, if using
> the per-record API provided by the TransactionContext class, includes at
> least one record that should trigger a transaction commit/abort in that
> batch)
>
> The Connect runtime should automatically commit source offsets to Kafka
> whenever a transaction is completed, either by commit or abort. This is
> because transactions should only be aborted for data that should never be
> re-read by the connector; if there is a validation error that should be
> handled by reconfiguring the connector, then the task should throw an
> exception instead of aborting the transaction.
>
> If possible, do you think you could provide a brief code snippet
> illustrating what your task is doing that's causing issues?
>
> Cheers,
>
> Chris (not Chrise 🙂)
>
> On Tue, Mar 7, 2023 at 10:17 AM NITTY BENNY  wrote:
>
> > Hi Chrise,
> >
> > Thanks for sharing the details.
> >
> > Regarding the use case, For Asn1 source connector we have a use case to
> > validate number of records in the file with the number of records in the
> > header. So currently, if validation fails we are not sending the last
> > record to the topic. But after introducing exactly once with connector
> > transaction boundary, I can see that if I call an abort when the
> validation
> > fails, transaction is not getting completed because it is not commiting
> the
> > transaction offest. I saw that transaction state changed to
> CompleteAbort.
> > So for my next transaction I am getting InvalidProducerEpochException and
> > then task stopped after that. I tried calling the abort after sending
> last
> > record to the topic then transaction getting completed.
> >
> > I dont know if I am doing anything wrong here.
> >
> > Please advise.
> > Thanks,
> > Nitty
> >
> > On Tue 7 Mar 2023 at 2:21 p.m., Chris Egerton 
> > wrote:
> >
> > > Hi Nitty,
> > >
> > > We've recently added some documentation on implementing exactly-once
> > source
> > > connectors here:
> > >
> >
> https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors
> > > .
> > > To quote a relevant passage from those docs:
> > >
> > > > In order for a source connector to take advantage of this support, it
> > > must be able to provide meaningful source offsets for each record that
> it
> > > emits, and resume consumption from the external system at the exact
> > > position corresponding to any of those offsets without dropping or
> > > duplicating messages.
> > >
> > > So, as long as your source connector is able to use the Kafka Connect
> > > framework's offsets API correctly, it shouldn't be necessary to make
> any
> > > other code changes to the connector.
> > >
> > > To enable exactly-once support for source connectors on your Connect
> > > cluster, see the docs section here:
> > > https://kafka.apache.org/documentation/#co

Re: Exactly once kafka connect query

2023-03-08 Thread Chris Egerton
Hi Nitty,

I'm a little confused about what you mean by this part:

> transaction is not getting completed because it is not commiting the
transaction offest.

The only conditions required for a transaction to be completed when a
connector is defining its own transaction boundaries are:

1. The task requests a transaction commit/abort from the TransactionContext
2. The task returns a batch of records from SourceTask::poll (and, if using
the per-record API provided by the TransactionContext class, includes at
least one record that should trigger a transaction commit/abort in that
batch)

The Connect runtime should automatically commit source offsets to Kafka
whenever a transaction is completed, either by commit or abort. This is
because transactions should only be aborted for data that should never be
re-read by the connector; if there is a validation error that should be
handled by reconfiguring the connector, then the task should throw an
exception instead of aborting the transaction.

If possible, do you think you could provide a brief code snippet
illustrating what your task is doing that's causing issues?

Cheers,

Chris (not Chrise 🙂)

On Tue, Mar 7, 2023 at 10:17 AM NITTY BENNY  wrote:

> Hi Chrise,
>
> Thanks for sharing the details.
>
> Regarding the use case, For Asn1 source connector we have a use case to
> validate number of records in the file with the number of records in the
> header. So currently, if validation fails we are not sending the last
> record to the topic. But after introducing exactly once with connector
> transaction boundary, I can see that if I call an abort when the validation
> fails, transaction is not getting completed because it is not commiting the
> transaction offest. I saw that transaction state changed to CompleteAbort.
> So for my next transaction I am getting InvalidProducerEpochException and
> then task stopped after that. I tried calling the abort after sending last
> record to the topic then transaction getting completed.
>
> I dont know if I am doing anything wrong here.
>
> Please advise.
> Thanks,
> Nitty
>
> On Tue 7 Mar 2023 at 2:21 p.m., Chris Egerton 
> wrote:
>
> > Hi Nitty,
> >
> > We've recently added some documentation on implementing exactly-once
> source
> > connectors here:
> >
> https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors
> > .
> > To quote a relevant passage from those docs:
> >
> > > In order for a source connector to take advantage of this support, it
> > must be able to provide meaningful source offsets for each record that it
> > emits, and resume consumption from the external system at the exact
> > position corresponding to any of those offsets without dropping or
> > duplicating messages.
> >
> > So, as long as your source connector is able to use the Kafka Connect
> > framework's offsets API correctly, it shouldn't be necessary to make any
> > other code changes to the connector.
> >
> > To enable exactly-once support for source connectors on your Connect
> > cluster, see the docs section here:
> > https://kafka.apache.org/documentation/#connect_exactlyoncesource
> >
> > With regard to transactions, a transactional producer is always created
> > automatically for your connector by the Connect runtime when exactly-once
> > support is enabled on the worker. The only reason to set
> > "transaction.boundary" to "connector" is if your connector would like to
> > explicitly define its own transaction boundaries. In this case, it sounds
> > like may be what you want; I just want to make sure to call out that in
> > either case, you should not be directly instantiating a producer in your
> > connector code, but let the Kafka Connect runtime do that for you, and
> just
> > worry about returning the right records from SourceTask::poll (and
> possibly
> > defining custom transactions using the TransactionContext API).
> >
> > With respect to your question about committing or aborting in certain
> > circumstances, it'd be useful to know more about your use case, since it
> > may not be necessary to define custom transaction boundaries in your
> > connector at all.
> >
> > Cheers,
> >
> > Chris
> >
> >
> >
> > On Tue, Mar 7, 2023 at 7:21 AM NITTY BENNY  wrote:
> >
> > > Hi Team,
> > >
> > > Adding on top of this, I tried creating a TransactionContext object and
> > > calling the commitTransaction and abortTranaction methods in source
> > > connectors.
> > > But the main problem I saw is that if there is any error while parsing
> > the
> > > record, connect is calling an abort but we have a use case to call
> commit
> > > in some cases. Is it a valid use case in terms of kafka connect?
> > >
> > > Another Question - Should I use a transactional producer instead
> > > creating an object of TransactionContext? Below is the connector
> > > configuration I am using.
> > >
> > >   exactly.once.support: "required"
> > >   transaction.boundary: "connector"
> > >
> > > Could you please help me here?
> > >
> > > Thanks,
> > > Nit

Re: Exactly once kafka connect query

2023-03-07 Thread NITTY BENNY
Hi Chrise,

Thanks for sharing the details.

Regarding the use case, For Asn1 source connector we have a use case to
validate number of records in the file with the number of records in the
header. So currently, if validation fails we are not sending the last
record to the topic. But after introducing exactly once with connector
transaction boundary, I can see that if I call an abort when the validation
fails, transaction is not getting completed because it is not commiting the
transaction offest. I saw that transaction state changed to CompleteAbort.
So for my next transaction I am getting InvalidProducerEpochException and
then task stopped after that. I tried calling the abort after sending last
record to the topic then transaction getting completed.

I dont know if I am doing anything wrong here.

Please advise.
Thanks,
Nitty

On Tue 7 Mar 2023 at 2:21 p.m., Chris Egerton 
wrote:

> Hi Nitty,
>
> We've recently added some documentation on implementing exactly-once source
> connectors here:
> https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors
> .
> To quote a relevant passage from those docs:
>
> > In order for a source connector to take advantage of this support, it
> must be able to provide meaningful source offsets for each record that it
> emits, and resume consumption from the external system at the exact
> position corresponding to any of those offsets without dropping or
> duplicating messages.
>
> So, as long as your source connector is able to use the Kafka Connect
> framework's offsets API correctly, it shouldn't be necessary to make any
> other code changes to the connector.
>
> To enable exactly-once support for source connectors on your Connect
> cluster, see the docs section here:
> https://kafka.apache.org/documentation/#connect_exactlyoncesource
>
> With regard to transactions, a transactional producer is always created
> automatically for your connector by the Connect runtime when exactly-once
> support is enabled on the worker. The only reason to set
> "transaction.boundary" to "connector" is if your connector would like to
> explicitly define its own transaction boundaries. In this case, it sounds
> like may be what you want; I just want to make sure to call out that in
> either case, you should not be directly instantiating a producer in your
> connector code, but let the Kafka Connect runtime do that for you, and just
> worry about returning the right records from SourceTask::poll (and possibly
> defining custom transactions using the TransactionContext API).
>
> With respect to your question about committing or aborting in certain
> circumstances, it'd be useful to know more about your use case, since it
> may not be necessary to define custom transaction boundaries in your
> connector at all.
>
> Cheers,
>
> Chris
>
>
>
> On Tue, Mar 7, 2023 at 7:21 AM NITTY BENNY  wrote:
>
> > Hi Team,
> >
> > Adding on top of this, I tried creating a TransactionContext object and
> > calling the commitTransaction and abortTranaction methods in source
> > connectors.
> > But the main problem I saw is that if there is any error while parsing
> the
> > record, connect is calling an abort but we have a use case to call commit
> > in some cases. Is it a valid use case in terms of kafka connect?
> >
> > Another Question - Should I use a transactional producer instead
> > creating an object of TransactionContext? Below is the connector
> > configuration I am using.
> >
> >   exactly.once.support: "required"
> >   transaction.boundary: "connector"
> >
> > Could you please help me here?
> >
> > Thanks,
> > Nitty
> >
> > On Tue, Mar 7, 2023 at 12:29 AM NITTY BENNY 
> wrote:
> >
> > > Hi Team,
> > > I am trying to implement exactly once behavior in our source connector.
> > Is
> > > there any sample source connector implementation available to have a
> look
> > > at?
> > > Regards,
> > > Nitty
> > >
> >
>


Re: Exactly once kafka connect query

2023-03-07 Thread Chris Egerton
Hi Nitty,

We've recently added some documentation on implementing exactly-once source
connectors here:
https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors.
To quote a relevant passage from those docs:

> In order for a source connector to take advantage of this support, it
must be able to provide meaningful source offsets for each record that it
emits, and resume consumption from the external system at the exact
position corresponding to any of those offsets without dropping or
duplicating messages.

So, as long as your source connector is able to use the Kafka Connect
framework's offsets API correctly, it shouldn't be necessary to make any
other code changes to the connector.

To enable exactly-once support for source connectors on your Connect
cluster, see the docs section here:
https://kafka.apache.org/documentation/#connect_exactlyoncesource

With regard to transactions, a transactional producer is always created
automatically for your connector by the Connect runtime when exactly-once
support is enabled on the worker. The only reason to set
"transaction.boundary" to "connector" is if your connector would like to
explicitly define its own transaction boundaries. In this case, it sounds
like may be what you want; I just want to make sure to call out that in
either case, you should not be directly instantiating a producer in your
connector code, but let the Kafka Connect runtime do that for you, and just
worry about returning the right records from SourceTask::poll (and possibly
defining custom transactions using the TransactionContext API).

With respect to your question about committing or aborting in certain
circumstances, it'd be useful to know more about your use case, since it
may not be necessary to define custom transaction boundaries in your
connector at all.

Cheers,

Chris



On Tue, Mar 7, 2023 at 7:21 AM NITTY BENNY  wrote:

> Hi Team,
>
> Adding on top of this, I tried creating a TransactionContext object and
> calling the commitTransaction and abortTranaction methods in source
> connectors.
> But the main problem I saw is that if there is any error while parsing the
> record, connect is calling an abort but we have a use case to call commit
> in some cases. Is it a valid use case in terms of kafka connect?
>
> Another Question - Should I use a transactional producer instead
> creating an object of TransactionContext? Below is the connector
> configuration I am using.
>
>   exactly.once.support: "required"
>   transaction.boundary: "connector"
>
> Could you please help me here?
>
> Thanks,
> Nitty
>
> On Tue, Mar 7, 2023 at 12:29 AM NITTY BENNY  wrote:
>
> > Hi Team,
> > I am trying to implement exactly once behavior in our source connector.
> Is
> > there any sample source connector implementation available to have a look
> > at?
> > Regards,
> > Nitty
> >
>