Re: max.poll.interval.ms rebalance override

2023-03-08 Thread Mcs Vemuri
I overlooked the pause resume mechanism- it should work for us I think 


On Wednesday, March 8, 2023, 4:52 PM, Mcs Vemuri  wrote:

Hello,
Is there a way to override a rebalance caused by max.poll.interval.ms? We have 
a case where there are multiple consumers in the same group- but some of them 
might start sooner than the rest and all consumers can be lazy pollers 
The issue is that we need to set the poll interval to a large value as the 
consumer may not always need to poll- instead, they only poll when they receive 
some other stimulus. Because of this, when the other consumer joins the group, 
rebalance takes very long 
Ideally, the rebalance timeout would have its own config(maybe 
max.rebalance.ms?) which defaults to max.poll.interval.ms but not sure if that 
was already considered and rejected due to some other reason









max.poll.interval.ms rebalance override

2023-03-08 Thread Mcs Vemuri
Hello,
Is there a way to override a rebalance caused by max.poll.interval.ms? We have 
a case where there are multiple consumers in the same group- but some of them 
might start sooner than the rest and all consumers can be lazy pollers 
The issue is that we need to set the poll interval to a large value as the 
consumer may not always need to poll- instead, they only poll when they receive 
some other stimulus. Because of this, when the other consumer joins the group, 
rebalance takes very long 
Ideally, the rebalance timeout would have its own config(maybe 
max.rebalance.ms?) which defaults to max.poll.interval.ms but not sure if that 
was already considered and rejected due to some other reason






Re: Exactly once kafka connect query

2023-03-08 Thread NITTY BENNY
Hi Chris,

I am not sure if you are able to see the images I shared with you .
Copying the code snippet below,

 if (expectedRecordCount >= 0) {
int missingCount = expectedRecordCount - (int) this.recordOffset()
- 1;
if (missingCount > 0) {
  if (transactionContext != null) {
isMissedRecords = true;
  } else {
throw new DataException(String.format("Missing %d records
(expecting %d, actual %d)", missingCount, expectedRecordCount, this.
recordOffset()));
  }
} else if (missingCount < 0) {
  if (transactionContext != null) {
isMissedRecords = true;
  } else {
throw new DataException(String.format("Too many records
(expecting %d, actual %d)", expectedRecordCount, this.recordOffset()));
  }
}
  }
  addLastRecord(records, null, value);
}



 //asn1 or binary abort
if((config.parseErrorThreshold != null && parseErrorCount >= config.
parseErrorThreshold
&& lastbatch && transactionContext != null) || (isMissedRecords &&
transactionContext != null && lastbatch)) {
  log.info("Transaction is aborting");
log.info("records = {}", records);
if (!records.isEmpty()) {
  log.info("with record");
  transactionContext.abortTransaction(records.get(records.size
()-1));
} else {
  log.info("without record");
  transactionContext.abortTransaction();
}

Thanks,
Nitty

On Wed, Mar 8, 2023 at 11:38 PM NITTY BENNY  wrote:

> Hi Chris,
> Sorry for the typo in my previous email.
>
> Regarding the point 2,* the task returns a batch of records from
> SourceTask::poll (and, if using*
>
>
> *the per-record API provided by the TransactionContext class, includes
> atleast one record that should trigger a transaction commit/abort in
> thatbatch)*
> What if I am using the API without passing a record? We have 2 types of
> use cases, one where on encountering an error record, we want to commit
> previous successful batches and disregard the failed record and upcoming
> batches. In this case we created the transactionContext just before reading
> the file (file is our transaction boundary).Say if I have a file with 5
> records and batch size is 2, and in my 3rd batch I have one error record
> then in that batch, I dont have a valid record to call commit or abort. But
> I want to commit all the previous batches that were successfully parsed.
> How do I do that?
>
> Second use case is where I want to abort a transaction if the record count
> doesn't match.
> Code Snippet :
> [image: image.png]
> There are no error records in this case. If you see I added the condition
> of transactionContext check to implement exactly once, without
> transaction it was just throwing the exception without calling the
> addLastRecord() method and in the catch block it just logs the message and
> return the list of records without the last record to poll().To make it
> work, I called the method addLastRecord() in this case, so it is not
> throwing the exception and list has last record as well. Then I called the
> abort, everything got aborted. Why is abort not working without adding the
> last record to the list?
> [image: image.png]
>
> Code to call abort.
>
>
>
>
> Thanks,
> Nitty
>
> On Wed, Mar 8, 2023 at 4:26 PM Chris Egerton 
> wrote:
>
>> Hi Nitty,
>>
>> I'm a little confused about what you mean by this part:
>>
>> > transaction is not getting completed because it is not commiting the
>> transaction offest.
>>
>> The only conditions required for a transaction to be completed when a
>> connector is defining its own transaction boundaries are:
>>
>> 1. The task requests a transaction commit/abort from the
>> TransactionContext
>> 2. The task returns a batch of records from SourceTask::poll (and, if
>> using
>> the per-record API provided by the TransactionContext class, includes at
>> least one record that should trigger a transaction commit/abort in that
>> batch)
>>
>> The Connect runtime should automatically commit source offsets to Kafka
>> whenever a transaction is completed, either by commit or abort. This is
>> because transactions should only be aborted for data that should never be
>> re-read by the connector; if there is a validation error that should be
>> handled by reconfiguring the connector, then the task should throw an
>> exception instead of aborting the transaction.
>>
>> If possible, do you think you could provide a brief code snippet
>> illustrating what your task is doing that's causing issues?
>>
>> Cheers,
>>
>> Chris (not Chrise 🙂)
>>
>> On Tue, Mar 7, 2023 at 10:17 AM NITTY BENNY  wrote:
>>
>> > Hi Chrise,
>> >
>> > Thanks for sharing the details.
>> >
>> > Regarding the use case, For Asn1 source connector we have a use case to
>> > validate number of records in the file with the number of records in the
>> > hea

Re: Exactly once kafka connect query

2023-03-08 Thread NITTY BENNY
Hi Chris,
Sorry for the typo in my previous email.

Regarding the point 2,* the task returns a batch of records from
SourceTask::poll (and, if using*


*the per-record API provided by the TransactionContext class, includes
atleast one record that should trigger a transaction commit/abort in
thatbatch)*
What if I am using the API without passing a record? We have 2 types of use
cases, one where on encountering an error record, we want to commit
previous successful batches and disregard the failed record and upcoming
batches. In this case we created the transactionContext just before reading
the file (file is our transaction boundary).Say if I have a file with 5
records and batch size is 2, and in my 3rd batch I have one error record
then in that batch, I dont have a valid record to call commit or abort. But
I want to commit all the previous batches that were successfully parsed.
How do I do that?

Second use case is where I want to abort a transaction if the record count
doesn't match.
Code Snippet :
[image: image.png]
There are no error records in this case. If you see I added the condition
of transactionContext check to implement exactly once, without
transaction it was just throwing the exception without calling the
addLastRecord() method and in the catch block it just logs the message and
return the list of records without the last record to poll().To make it
work, I called the method addLastRecord() in this case, so it is not
throwing the exception and list has last record as well. Then I called the
abort, everything got aborted. Why is abort not working without adding the
last record to the list?
[image: image.png]

Code to call abort.




Thanks,
Nitty

On Wed, Mar 8, 2023 at 4:26 PM Chris Egerton 
wrote:

> Hi Nitty,
>
> I'm a little confused about what you mean by this part:
>
> > transaction is not getting completed because it is not commiting the
> transaction offest.
>
> The only conditions required for a transaction to be completed when a
> connector is defining its own transaction boundaries are:
>
> 1. The task requests a transaction commit/abort from the TransactionContext
> 2. The task returns a batch of records from SourceTask::poll (and, if using
> the per-record API provided by the TransactionContext class, includes at
> least one record that should trigger a transaction commit/abort in that
> batch)
>
> The Connect runtime should automatically commit source offsets to Kafka
> whenever a transaction is completed, either by commit or abort. This is
> because transactions should only be aborted for data that should never be
> re-read by the connector; if there is a validation error that should be
> handled by reconfiguring the connector, then the task should throw an
> exception instead of aborting the transaction.
>
> If possible, do you think you could provide a brief code snippet
> illustrating what your task is doing that's causing issues?
>
> Cheers,
>
> Chris (not Chrise 🙂)
>
> On Tue, Mar 7, 2023 at 10:17 AM NITTY BENNY  wrote:
>
> > Hi Chrise,
> >
> > Thanks for sharing the details.
> >
> > Regarding the use case, For Asn1 source connector we have a use case to
> > validate number of records in the file with the number of records in the
> > header. So currently, if validation fails we are not sending the last
> > record to the topic. But after introducing exactly once with connector
> > transaction boundary, I can see that if I call an abort when the
> validation
> > fails, transaction is not getting completed because it is not commiting
> the
> > transaction offest. I saw that transaction state changed to
> CompleteAbort.
> > So for my next transaction I am getting InvalidProducerEpochException and
> > then task stopped after that. I tried calling the abort after sending
> last
> > record to the topic then transaction getting completed.
> >
> > I dont know if I am doing anything wrong here.
> >
> > Please advise.
> > Thanks,
> > Nitty
> >
> > On Tue 7 Mar 2023 at 2:21 p.m., Chris Egerton 
> > wrote:
> >
> > > Hi Nitty,
> > >
> > > We've recently added some documentation on implementing exactly-once
> > source
> > > connectors here:
> > >
> >
> https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors
> > > .
> > > To quote a relevant passage from those docs:
> > >
> > > > In order for a source connector to take advantage of this support, it
> > > must be able to provide meaningful source offsets for each record that
> it
> > > emits, and resume consumption from the external system at the exact
> > > position corresponding to any of those offsets without dropping or
> > > duplicating messages.
> > >
> > > So, as long as your source connector is able to use the Kafka Connect
> > > framework's offsets API correctly, it shouldn't be necessary to make
> any
> > > other code changes to the connector.
> > >
> > > To enable exactly-once support for source connectors on your Connect
> > > cluster, see the docs section here:
> > > https://kafka.apache.org/documentation/#co

Re: producer purgatory

2023-03-08 Thread David Ballano Fernandez
Thanks Andrew.

I'll give linger.ms a try.

I was testing worse case scenarios so linger.ms was set to 0. also the
producer was doing ack=all. which definitely adds all the producer requests
to the purgatory waiting to be acknowledged.

thanks.

On Sat, Mar 4, 2023 at 2:57 PM Andrew Grant 
wrote:

> Hey David, The followers replicate from the leader and when they do that
> they write to their own local log. For the ceph cluster, it sounds like the
> followers writes to their local log are slower? Seems like that would sense
> if those writes
> ZjQcmQRYFpfptBannerStart
> This Message Is From an External Sender
> This message came from outside your organization.
>
> ZjQcmQRYFpfptBannerEnd
>
> Hey David,
>
> The followers replicate from the leader and when they do that they write to
> their own local log. For the ceph cluster, it sounds like the followers
> writes to their local log are slower? Seems like that would sense if those
> writes are going over the network. This could explain why the leader ends
> up having to wait longer to hear back from the followers before sending the
> produce response, which in turn could explain why the producer purgatory is
> bigger. See the section "Commit time: Replicating the record from leader to
> followers" 
> inhttps://urldefense.proofpoint.com/v2/url?u=https-3A__www.confluent.io_blog_configure-2Dkafka-2Dto-2Dminimize-2Dlatency_&d=DwIFaQ&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=p-f3AJg4e4Uk20g_16kSyBtabT4JOB-1GIb23_CxD58&m=zLuPFKcuQiGQ2M4jmewuPY50sE4866smXx9rdOQAGEb_iQgWdhTDBueMBHYkRjJD&s=c5f8mCYTeaXvOyDsuOUPUaJTUfQznxeAocPSU93sIds&e=.
>
> To amortize the cost of slower followers you could look into 
> increasinglinger.ms so that the producer batches a bit more.
>
> Hope that helps a bit.
>
> Andrew
>
> On Mon, Feb 27, 2023 at 3:39 PM David Ballano Fernandez 
>  wrote:
>
> > thank you!
> >
> > On Mon, Feb 27, 2023 at 12:37 PM David Ballano Fernandez <
> > dfernan...@demonware.net> wrote:
> >
> > > Hi guys,
> > >
> > > I am loadtesting a couple clusters one with local ssd disks and another
> > > one with ceph.
> > >
> > > Both clusters have the same amount of cpu/ram and they are configured the
> > > same way.
> > > im sending the same amount of messages and producing with linger.ms=0
> > and
> > > acks=all
> > >
> > > besides seeing higuer latencies on ceph for the most part, compared to
> > > local disk. There is something that I don't understand.
> > >
> > > On the local disk cluster. messages per second matches exactly the
> > > number of requests.
> > > but on the ceph cluster messages  do not match total produce requests per
> > > second.
> > >
> > > and the only thing I can find is that the Producer purgatory in ceph
> > kafka
> > > cluster has more request queued up than the local disk.
> > >
> > > Also RemoteTime-ms for producers is high, which could explain why there
> > > are more requests on the purgatory.
> > >
> > > To me , I think this means that the Producer is waiting to hear from all
> > > the acks. which are set to all. But I don't understand why the local disk
> > > Kafka cluster purgatory queue is way lower.
> > >
> > > since I don't think disk is used for this? could be network saturation
> > > since ceph  is network storage is interfering with the  producer waiting
> > > for acks? is there a way to tune the producer purgatory? I did change
> > > num.replica.fetchers but that only lowered the fetch purgatory.
> > >
> > >
> > >
> > >
> > >
> >
>
>


Re: Exactly once kafka connect query

2023-03-08 Thread Chris Egerton
Hi Nitty,

I'm a little confused about what you mean by this part:

> transaction is not getting completed because it is not commiting the
transaction offest.

The only conditions required for a transaction to be completed when a
connector is defining its own transaction boundaries are:

1. The task requests a transaction commit/abort from the TransactionContext
2. The task returns a batch of records from SourceTask::poll (and, if using
the per-record API provided by the TransactionContext class, includes at
least one record that should trigger a transaction commit/abort in that
batch)

The Connect runtime should automatically commit source offsets to Kafka
whenever a transaction is completed, either by commit or abort. This is
because transactions should only be aborted for data that should never be
re-read by the connector; if there is a validation error that should be
handled by reconfiguring the connector, then the task should throw an
exception instead of aborting the transaction.

If possible, do you think you could provide a brief code snippet
illustrating what your task is doing that's causing issues?

Cheers,

Chris (not Chrise 🙂)

On Tue, Mar 7, 2023 at 10:17 AM NITTY BENNY  wrote:

> Hi Chrise,
>
> Thanks for sharing the details.
>
> Regarding the use case, For Asn1 source connector we have a use case to
> validate number of records in the file with the number of records in the
> header. So currently, if validation fails we are not sending the last
> record to the topic. But after introducing exactly once with connector
> transaction boundary, I can see that if I call an abort when the validation
> fails, transaction is not getting completed because it is not commiting the
> transaction offest. I saw that transaction state changed to CompleteAbort.
> So for my next transaction I am getting InvalidProducerEpochException and
> then task stopped after that. I tried calling the abort after sending last
> record to the topic then transaction getting completed.
>
> I dont know if I am doing anything wrong here.
>
> Please advise.
> Thanks,
> Nitty
>
> On Tue 7 Mar 2023 at 2:21 p.m., Chris Egerton 
> wrote:
>
> > Hi Nitty,
> >
> > We've recently added some documentation on implementing exactly-once
> source
> > connectors here:
> >
> https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors
> > .
> > To quote a relevant passage from those docs:
> >
> > > In order for a source connector to take advantage of this support, it
> > must be able to provide meaningful source offsets for each record that it
> > emits, and resume consumption from the external system at the exact
> > position corresponding to any of those offsets without dropping or
> > duplicating messages.
> >
> > So, as long as your source connector is able to use the Kafka Connect
> > framework's offsets API correctly, it shouldn't be necessary to make any
> > other code changes to the connector.
> >
> > To enable exactly-once support for source connectors on your Connect
> > cluster, see the docs section here:
> > https://kafka.apache.org/documentation/#connect_exactlyoncesource
> >
> > With regard to transactions, a transactional producer is always created
> > automatically for your connector by the Connect runtime when exactly-once
> > support is enabled on the worker. The only reason to set
> > "transaction.boundary" to "connector" is if your connector would like to
> > explicitly define its own transaction boundaries. In this case, it sounds
> > like may be what you want; I just want to make sure to call out that in
> > either case, you should not be directly instantiating a producer in your
> > connector code, but let the Kafka Connect runtime do that for you, and
> just
> > worry about returning the right records from SourceTask::poll (and
> possibly
> > defining custom transactions using the TransactionContext API).
> >
> > With respect to your question about committing or aborting in certain
> > circumstances, it'd be useful to know more about your use case, since it
> > may not be necessary to define custom transaction boundaries in your
> > connector at all.
> >
> > Cheers,
> >
> > Chris
> >
> >
> >
> > On Tue, Mar 7, 2023 at 7:21 AM NITTY BENNY  wrote:
> >
> > > Hi Team,
> > >
> > > Adding on top of this, I tried creating a TransactionContext object and
> > > calling the commitTransaction and abortTranaction methods in source
> > > connectors.
> > > But the main problem I saw is that if there is any error while parsing
> > the
> > > record, connect is calling an abort but we have a use case to call
> commit
> > > in some cases. Is it a valid use case in terms of kafka connect?
> > >
> > > Another Question - Should I use a transactional producer instead
> > > creating an object of TransactionContext? Below is the connector
> > > configuration I am using.
> > >
> > >   exactly.once.support: "required"
> > >   transaction.boundary: "connector"
> > >
> > > Could you please help me here?
> > >
> > > Thanks,
> > > Nit