[jira] [Created] (KAFKA-16609) Update parse_describe_topic to support new topic describe output

2024-04-23 Thread Kirk True (Jira)
Kirk True created KAFKA-16609:
-

 Summary: Update parse_describe_topic to support new topic describe 
output
 Key: KAFKA-16609
 URL: https://issues.apache.org/jira/browse/KAFKA-16609
 Project: Kafka
  Issue Type: Bug
  Components: admin, system tests
Affects Versions: 3.8.0
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 3.8.0


It appears that recent changes to the describe topic output has broken the 
system test's ability to parse the output.

{noformat}
test_id:
kafkatest.tests.core.reassign_partitions_test.ReassignPartitionsTest.test_reassign_partitions.bounce_brokers=False.reassign_from_offset_zero=True.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   50.333 seconds


IndexError('list index out of range')
Traceback (most recent call last):
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 184, in _do_run
data = self.run_test()
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
 line 262, in run_test
return self.test_context.function(self.test)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
 line 433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/core/reassign_partitions_test.py",
 line 175, in test_reassign_partitions
self.run_produce_consume_validate(core_test_action=lambda: 
self.reassign_partitions(bounce_brokers))
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/produce_consume_validate.py",
 line 105, in run_produce_consume_validate
core_test_action(*args)
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/core/reassign_partitions_test.py",
 line 175, in 
self.run_produce_consume_validate(core_test_action=lambda: 
self.reassign_partitions(bounce_brokers))
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/core/reassign_partitions_test.py",
 line 82, in reassign_partitions
partition_info = 
self.kafka.parse_describe_topic(self.kafka.describe_topic(self.topic))
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/kafka/kafka.py",
 line 1400, in parse_describe_topic
fields = list(map(lambda x: x.split(" ")[1], fields))
  File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/kafka/kafka.py",
 line 1400, in 
fields = list(map(lambda x: x.split(" ")[1], fields))
IndexError: list index out of range
{noformat} 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1038: Add Custom Error Handler to Producer

2024-04-23 Thread Justine Olshan
Thanks Alieh for the updates.

I'm a little concerned about the design pattern here. It seems like we want
specific usages, but we are packaging it as a generic handler.
I think we tried to narrow down on the specific errors we want to handle,
but it feels a little clunky as we have a generic thing for two specific
errors.

I'm wondering if we are using the right patterns to solve these problems. I
agree though that we will need something more than the error classes I'm
proposing if we want to have different handling be configurable.
My concern is that the open-endedness of a handler means that we are
creating more problems than we are solving. It is still unclear to me how
we expect to handle the errors. Perhaps we could include an example? It
seems like there is a specific use case in mind and maybe we can make a
design that is tighter and supports that case.

Justine

On Tue, Apr 23, 2024 at 3:06 PM Kirk True  wrote:

> Hi Alieh,
>
> Thanks for the KIP!
>
> A few questions:
>
> K1. What is the expected behavior for the producer if it generates a
> RecordTooLargeException, but the handler returns RETRY?
> K2. How do we determine which Record was responsible for the
> UnknownTopicOrPartitionException since we get that response when sending  a
> batch of records?
> K3. What is the expected behavior if the handle() method itself throws an
> error?
> K4. What is the downside of adding an onError() method to the Producer’s
> Callback interface vs. a new mechanism?
> K5. Can we change “ProducerExceptionHandlerResponse" to just “Response”
> given that it’s an inner enum?
> K6. Any recommendation for callback authors to handle different behavior
> for different topics?
>
> I’ll echo what others have said, it would help me understand why we want
> another handler class if there were more examples in the Motivation
> section. As it stands now, I agree with Chris that the stated issues could
> be solved by adding two new configuration options:
>
> oversized.record.behavior=fail
> retry.on.unknown.topic.or.partition=true
>
> What I’m not yet able to wrap my head around is: what exactly would the
> logic in the handler be? I’m not very imaginative, so I’m assuming they’d
> mostly be if-this-then-that. However, if they’re more complicated, I’d have
> other concerns.
>
> Thanks,
> Kirk
>
> > On Apr 22, 2024, at 7:38 AM, Alieh Saeedi 
> wrote:
> >
> > Thank you all for the feedback!
> >
> > Addressing the main concern: The KIP is about giving the user the ability
> > to handle producer exceptions, but to be more conservative and avoid
> future
> > issues, we decided to be limited to a short list of exceptions. I
> included
> > *RecordTooLargeExceptin* and *UnknownTopicOrPartitionException. *Open to
> > suggestion for adding some more ;-)
> >
> > KIP Updates:
> > - clarified the way that the user should configure the Producer to use
> the
> > custom handler. I think adding a producer config property is the cleanest
> > one.
> > - changed the *ClientExceptionHandler* to *ProducerExceptionHandler* to
> be
> > closer to what we are changing.
> > - added the ProducerRecord as the input parameter of the handle() method
> as
> > well.
> > - increased the response types to 3 to have fail and two types of
> continue.
> > - The default behaviour is having no custom handler, having the
> > corresponding config parameter set to null. Therefore, the KIP provides
> no
> > default implementation of the interface.
> > - We follow the interface solution as described in the
> > Rejected Alternetives section.
> >
> >
> > Cheers,
> > Alieh
> >
> >
> > On Thu, Apr 18, 2024 at 8:11 PM Matthias J. Sax 
> wrote:
> >
> >> Thanks for the KIP Alieh! It addresses an important case for error
> >> handling.
> >>
> >> I agree that using this handler would be an expert API, as mentioned by
> >> a few people. But I don't think it would be a reason to not add it. It's
> >> always a tricky tradeoff what to expose to users and to avoid foot guns,
> >> but we added similar handlers to Kafka Streams, and have good experience
> >> with it. Hence, I understand, but don't share the concern raised.
> >>
> >> I also agree that there is some responsibility by the user to understand
> >> how such a handler should be implemented to not drop data by accident.
> >> But it seem unavoidable and acceptable.
> >>
> >> While I understand that a "simpler / reduced" API (eg via configs) might
> >> also work, I personally prefer a full handler. Configs have the same
> >> issue that they could be miss-used potentially leading to incorrectly
> >> dropped data, but at the same time are less flexible (and thus maybe
> >> ever harder to use correctly...?). Base on my experience, there is also
> >> often weird corner case for which it make sense to also drop records for
> >> other exceptions, and a full handler has the advantage of full
> >> flexibility and "absolute power!".
> >>
> >> To be fair: I don't know the exact code paths of the producer in
> >> details, so 

Re: [DISCUSS] KIP-1038: Add Custom Error Handler to Producer

2024-04-23 Thread Kirk True
Hi Alieh,

Thanks for the KIP!

A few questions:

K1. What is the expected behavior for the producer if it generates a 
RecordTooLargeException, but the handler returns RETRY?
K2. How do we determine which Record was responsible for the 
UnknownTopicOrPartitionException since we get that response when sending  a 
batch of records?
K3. What is the expected behavior if the handle() method itself throws an error?
K4. What is the downside of adding an onError() method to the Producer’s 
Callback interface vs. a new mechanism?
K5. Can we change “ProducerExceptionHandlerResponse" to just “Response” given 
that it’s an inner enum?
K6. Any recommendation for callback authors to handle different behavior for 
different topics?

I’ll echo what others have said, it would help me understand why we want 
another handler class if there were more examples in the Motivation section. As 
it stands now, I agree with Chris that the stated issues could be solved by 
adding two new configuration options:

oversized.record.behavior=fail
retry.on.unknown.topic.or.partition=true

What I’m not yet able to wrap my head around is: what exactly would the logic 
in the handler be? I’m not very imaginative, so I’m assuming they’d mostly be 
if-this-then-that. However, if they’re more complicated, I’d have other 
concerns.

Thanks,
Kirk

> On Apr 22, 2024, at 7:38 AM, Alieh Saeedi  
> wrote:
> 
> Thank you all for the feedback!
> 
> Addressing the main concern: The KIP is about giving the user the ability
> to handle producer exceptions, but to be more conservative and avoid future
> issues, we decided to be limited to a short list of exceptions. I included
> *RecordTooLargeExceptin* and *UnknownTopicOrPartitionException. *Open to
> suggestion for adding some more ;-)
> 
> KIP Updates:
> - clarified the way that the user should configure the Producer to use the
> custom handler. I think adding a producer config property is the cleanest
> one.
> - changed the *ClientExceptionHandler* to *ProducerExceptionHandler* to be
> closer to what we are changing.
> - added the ProducerRecord as the input parameter of the handle() method as
> well.
> - increased the response types to 3 to have fail and two types of continue.
> - The default behaviour is having no custom handler, having the
> corresponding config parameter set to null. Therefore, the KIP provides no
> default implementation of the interface.
> - We follow the interface solution as described in the
> Rejected Alternetives section.
> 
> 
> Cheers,
> Alieh
> 
> 
> On Thu, Apr 18, 2024 at 8:11 PM Matthias J. Sax  wrote:
> 
>> Thanks for the KIP Alieh! It addresses an important case for error
>> handling.
>> 
>> I agree that using this handler would be an expert API, as mentioned by
>> a few people. But I don't think it would be a reason to not add it. It's
>> always a tricky tradeoff what to expose to users and to avoid foot guns,
>> but we added similar handlers to Kafka Streams, and have good experience
>> with it. Hence, I understand, but don't share the concern raised.
>> 
>> I also agree that there is some responsibility by the user to understand
>> how such a handler should be implemented to not drop data by accident.
>> But it seem unavoidable and acceptable.
>> 
>> While I understand that a "simpler / reduced" API (eg via configs) might
>> also work, I personally prefer a full handler. Configs have the same
>> issue that they could be miss-used potentially leading to incorrectly
>> dropped data, but at the same time are less flexible (and thus maybe
>> ever harder to use correctly...?). Base on my experience, there is also
>> often weird corner case for which it make sense to also drop records for
>> other exceptions, and a full handler has the advantage of full
>> flexibility and "absolute power!".
>> 
>> To be fair: I don't know the exact code paths of the producer in
>> details, so please keep me honest. But my understanding is, that the KIP
>> aims to allow users to react to internal exception, and decide to keep
>> retrying internally, swallow the error and drop the record, or raise the
>> error?
>> 
>> Maybe the KIP would need to be a little bit more precises what error we
>> want to cover -- I don't think this list must be exhaustive, as we can
>> always do follow up KIP to also apply the handler to other errors to
>> expand the scope of the handler. The KIP does mention examples, but it
>> might be good to explicitly state for what cases the handler gets applied?
>> 
>> I am also not sure if CONTINUE and FAIL are enough options? Don't we
>> need three options? Or would `CONTINUE` have different meaning depending
>> on the type of error? Ie, for a retryable error `CONTINUE` would mean
>> keep retrying internally, but for a non-retryable error `CONTINUE` means
>> swallow the error and drop the record? This semantic overload seems
>> tricky to reason about by users, so it might better to split `CONTINUE`
>> into two cases -> `RETRY` and `SWALLOW` (or some better 

[PR] MINOR: Add Igor to committers [kafka-site]

2024-04-23 Thread via GitHub


soarez opened a new pull request, #598:
URL: https://github.com/apache/kafka-site/pull/598

   
   
   https://github.com/apache/kafka-site/assets/1357510/cccdd6e0-45d8-4e1a-949e-304d91815100;>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [DISCUSS] KIP-1036: Extend RecordDeserializationException exception

2024-04-23 Thread Kirk True
Hi Fred,

Thanks for the updates!

Questions:

K11. Can we reconsider the introduction of two new exception subclasses? 
Perhaps I don’t understand the benefit? Technically both the key and the value 
could have deserialization errors, right?

K12. Is there a benefit to exposing both the ByteBuffer and byte[]?

K13. Isn’t it possible for the key() and/or value() calls to throw NPE? (Or 
maybe I don’t understand why we need the old constructor :(

Thanks,
Kirk

> On Apr 23, 2024, at 12:45 AM, Frédérik Rouleau 
>  wrote:
> 
> Hi Andrew,
> 
> A1. I will change the order of arguments to match.
> A2 and A3, Yes the KIP is not updated yet as I do not have a wiki account.
> So I must rely on some help to do those changes, which add some delay. I
> will try to find someone available ASAP.
> A4. I had the same thought. Using keyBuffer and valueBuffer for the
> constructor seems fine for me. If no one disagrees, I will do that change.
> 
> Thanks,
> Fred



[jira] [Created] (KAFKA-16608) AsyncKafkaConsumer doesn't honour interrupted thread status on KafkaConsumer.poll(Duration)

2024-04-23 Thread Andrew Schofield (Jira)
Andrew Schofield created KAFKA-16608:


 Summary: AsyncKafkaConsumer doesn't honour interrupted thread 
status on KafkaConsumer.poll(Duration)
 Key: KAFKA-16608
 URL: https://issues.apache.org/jira/browse/KAFKA-16608
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 3.8.0
Reporter: Andrew Schofield


The behaviour for KafkaConsumer.poll(Duration) when the calling thread is in 
interrupted state is to throw InterruptException. The AsyncKafkaConsumer 
doesn't do this. It only throws that exception if the interruption occurs while 
it is waiting.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1038: Add Custom Error Handler to Producer

2024-04-23 Thread Knowles Atchison Jr
Alieh,

Having run into issues with not being able to handle producer failures, I
think this is good functionality to have.

With this new functionality proposed at the Producer level, how would
ecosystems that sit on top of it function? Specifically, Connect was
updated a few years ago to allow Source Connect Workers to handle producer
exceptions that would never succeed when the source data was bad.

Knowles

On Tue, Apr 23, 2024 at 5:23 AM Alieh Saeedi 
wrote:

> Thanks Matthias. I changed it to `custom.exception.handler`
>
> Alieh
>
>
> On Tue, Apr 23, 2024 at 8:47 AM Matthias J. Sax  wrote:
>
> > Thanks Alieh!
> >
> > A few nits:
> >
> >
> > 1) The new config we add for the producer should be mentioned in the
> > "Public Interfaces" section.
> >
> > 2) Why do we use `producer.` prefix for a *producer* config? Should it
> > be `exception.handler` only?
> >
> >
> > -Matthias
> >
> > On 4/22/24 7:38 AM, Alieh Saeedi wrote:
> > > Thank you all for the feedback!
> > >
> > > Addressing the main concern: The KIP is about giving the user the
> ability
> > > to handle producer exceptions, but to be more conservative and avoid
> > future
> > > issues, we decided to be limited to a short list of exceptions. I
> > included
> > > *RecordTooLargeExceptin* and *UnknownTopicOrPartitionException. *Open
> to
> > > suggestion for adding some more ;-)
> > >
> > > KIP Updates:
> > > - clarified the way that the user should configure the Producer to use
> > the
> > > custom handler. I think adding a producer config property is the
> cleanest
> > > one.
> > > - changed the *ClientExceptionHandler* to *ProducerExceptionHandler* to
> > be
> > > closer to what we are changing.
> > > - added the ProducerRecord as the input parameter of the handle()
> method
> > as
> > > well.
> > > - increased the response types to 3 to have fail and two types of
> > continue.
> > > - The default behaviour is having no custom handler, having the
> > > corresponding config parameter set to null. Therefore, the KIP provides
> > no
> > > default implementation of the interface.
> > > - We follow the interface solution as described in the
> > > Rejected Alternetives section.
> > >
> > >
> > > Cheers,
> > > Alieh
> > >
> > >
> > > On Thu, Apr 18, 2024 at 8:11 PM Matthias J. Sax 
> > wrote:
> > >
> > >> Thanks for the KIP Alieh! It addresses an important case for error
> > >> handling.
> > >>
> > >> I agree that using this handler would be an expert API, as mentioned
> by
> > >> a few people. But I don't think it would be a reason to not add it.
> It's
> > >> always a tricky tradeoff what to expose to users and to avoid foot
> guns,
> > >> but we added similar handlers to Kafka Streams, and have good
> experience
> > >> with it. Hence, I understand, but don't share the concern raised.
> > >>
> > >> I also agree that there is some responsibility by the user to
> understand
> > >> how such a handler should be implemented to not drop data by accident.
> > >> But it seem unavoidable and acceptable.
> > >>
> > >> While I understand that a "simpler / reduced" API (eg via configs)
> might
> > >> also work, I personally prefer a full handler. Configs have the same
> > >> issue that they could be miss-used potentially leading to incorrectly
> > >> dropped data, but at the same time are less flexible (and thus maybe
> > >> ever harder to use correctly...?). Base on my experience, there is
> also
> > >> often weird corner case for which it make sense to also drop records
> for
> > >> other exceptions, and a full handler has the advantage of full
> > >> flexibility and "absolute power!".
> > >>
> > >> To be fair: I don't know the exact code paths of the producer in
> > >> details, so please keep me honest. But my understanding is, that the
> KIP
> > >> aims to allow users to react to internal exception, and decide to keep
> > >> retrying internally, swallow the error and drop the record, or raise
> the
> > >> error?
> > >>
> > >> Maybe the KIP would need to be a little bit more precises what error
> we
> > >> want to cover -- I don't think this list must be exhaustive, as we can
> > >> always do follow up KIP to also apply the handler to other errors to
> > >> expand the scope of the handler. The KIP does mention examples, but it
> > >> might be good to explicitly state for what cases the handler gets
> > applied?
> > >>
> > >> I am also not sure if CONTINUE and FAIL are enough options? Don't we
> > >> need three options? Or would `CONTINUE` have different meaning
> depending
> > >> on the type of error? Ie, for a retryable error `CONTINUE` would mean
> > >> keep retrying internally, but for a non-retryable error `CONTINUE`
> means
> > >> swallow the error and drop the record? This semantic overload seems
> > >> tricky to reason about by users, so it might better to split
> `CONTINUE`
> > >> into two cases -> `RETRY` and `SWALLOW` (or some better names).
> > >>
> > >> Additionally, should we just ship a `DefaultClientExceptionHandler`
> > >> which would return 

[jira] [Resolved] (KAFKA-16462) New consumer fails with timeout in security_test.py system test

2024-04-23 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True resolved KAFKA-16462.
---
Resolution: Duplicate

> New consumer fails with timeout in security_test.py system test
> ---
>
> Key: KAFKA-16462
> URL: https://issues.apache.org/jira/browse/KAFKA-16462
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> The {{security_test.py}} system test fails with the following error:
> {noformat}
> test_id:
> kafkatest.tests.core.security_test.SecurityTest.test_client_ssl_endpoint_validation_failure.security_protocol=SSL.interbroker_security_protocol=PLAINTEXT.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   1 minute 30.885 seconds
> TimeoutError('')
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/core/security_test.py",
>  line 142, in test_client_ssl_endpoint_validation_failure
> wait_until(lambda: self.producer_consumer_have_expected_error(error), 
> timeout_sec=30)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py",
>  line 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16464) New consumer fails with timeout in replication_replica_failure_test.py system test

2024-04-23 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True resolved KAFKA-16464.
---
Resolution: Duplicate

> New consumer fails with timeout in replication_replica_failure_test.py system 
> test
> --
>
> Key: KAFKA-16464
> URL: https://issues.apache.org/jira/browse/KAFKA-16464
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> The {{replication_replica_failure_test.py}} system test fails with the 
> following error:
> {noformat}
> test_id:
> kafkatest.tests.core.replication_replica_failure_test.ReplicationReplicaFailureTest.test_replication_with_replica_failure.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   1 minute 20.972 seconds
> TimeoutError('Timed out after 30s while awaiting initial record delivery 
> of 5 records')
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/core/replication_replica_failure_test.py",
>  line 97, in test_replication_with_replica_failure
> self.await_startup()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/end_to_end.py",
>  line 125, in await_startup
> (timeout_sec, min_records))
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py",
>  line 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: Timed out after 30s while awaiting initial 
> record delivery of 5 records
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-932: Queues for Kafka

2024-04-23 Thread Jun Rao
Hi, Andrew,

Thanks for the reply.

123. "it doesn’t need to confirm the assignment back to the GC."
  Hmm, I thought the member needs to confirm the assignment to GC to
avoid GC including the assignment in the heartbeat response continuously. I
assume this is done by including the new group epoch in the heartbeat
response.

125. It's possible that the share partition has never been initialized when
AlterShareGroupOffsets is called. If GC doesn't write
ShareGroupPartitionMetadata and the GC fails over, it would reinitialize
the share partition and lose the effect of AlterShareGroupOffsets. If the
partition has already been initialized and it's recorded
in ShareGroupPartitionMetadata, it's possible not to write
ShareGroupPartitionMetadata again when handling AlterShareGroupOffsets.

138. Could we add the flow in GC when a topic is deleted?

139. Do we need to add any metrics in KafkaShareConsumer?

54. "I don’t think there is any redundancy. The ShareGroupMemberMetadata
does include a list of subscribed topics. However, if there is a period of
time in which no members are subscribed to a particular topic, it does not
mean that the topic’s ShareGroupState should be immediately removed, but it
does mean that there will be no ShareGroupMemberMetadata records containing
that topic."
  I am still trying to understand the value of InitializingTopics and
DeletingTopics in ShareGroupPartitionMetadataValue. They are used to
remember the intention of an operation. However, GC still needs to handle
the case when the intention is not safely recorded. If GC wants to
initialize a new topic/partition, a simpler approach is for it to send an
InitializeShareGroupState to the share coordinator and after receiving a
success response, write ShareGroupPartitionMetadataValue with the
initialized partition included in InitializedTopics. This saves a record
write. It's possible for GC to fail in between the two steps. On failover,
the new GC just repeats the process. The case that you mentioned above can
still be achieved. If a partition is in InitializedTopics of
ShareGroupPartitionMetadataValue, but no member subscribes to it, we can
still keep the ShareGroupState as long as the topic still exists. The same
optimization could be applied to DeletingTopics too.

Jun


On Tue, Apr 23, 2024 at 3:57 AM Andrew Schofield 
wrote:

> Hi Jun,
> Thanks for the reply.
>
> 123. Every time the GC fails over, it needs to recompute the assignment
> for every member. However, the impact of re-assignment is not that onerous.
> If the recomputed assignments are the same, which they may well be, there
> is no impact on the members at all.
>
> On receiving the new assignment, the member adjusts the topic-partitions
> in its share sessions, removing those which were revoked and adding those
> which were assigned. It is able to acknowledge the records it fetched from
> the partitions which have just been revoked, and it doesn’t need to confirm
> the assignment back to the GC.
>
> 125. I don’t think the GC needs to write ShareGroupPartitionMetadata
> when processing AlterShareGroupOffsets. This is because the operation
> happens as a result of an explicit administrative action and it is possible
> to return a specific error code for each topic-partition. The cases where
> ShareGroupPartitionMetadata is used are when a topic is added or removed
> from the subscribed topics, or the number of partitions changes.
>
> 130. I suppose that limits the minimum lock timeout for a cluster to
> prevent
> a group from having an excessively low value. Config added.
>
> 131. I have changed it to group.share.partition.max.record.locks.
>
> 136.  When GC failover occurs, the GC gaining ownership of a partition of
> the __consumer_offsets topic replays the records to build its state.
> In the case of a share group, it learns:
>
> * The share group and its group epoch (ShareGroupMetadata)
> * The list of members (ShareGroupMemberMetadata)
> * The list of share-partitions (ShareGroupPartitionMetadata)
>
> It will recompute the assignments in order to respond to
> ShareGroupHeartbeat requests. As a result, it bumps the group epoch.
>
> I will update the KIP accordingly to confirm the behaviour.
>
> 137.1: The GC and the SPL report the metrics in the
> group-coordinator-metrics
> group. Unlike consumer groups in which the GC performs offset commit,
> the share group equivalent is performed by the SPL. So, I’ve grouped the
> concepts which relate to the group in group-coordinator-metrics.
>
> The SC reports the metrics in the share-coordinator-metrics group.
>
> 137.2: There is one metric in both groups - partition-load-time. In the SC
> group,
> it refers to the time loading data from the share-group state topic so that
> a ReadShareGroupState request can be answered. In the GC group,
> it refers to the time to read the state from the persister. Apart from the
> interbroker RPC latency of the read, they’re likely to be very close.
>
> Later, for a cluster which is using a 

[jira] [Created] (KAFKA-16607) Update the KIP and metrics implementation to include the new state

2024-04-23 Thread Jira
José Armando García Sancio created KAFKA-16607:
--

 Summary: Update the KIP and metrics implementation to include the 
new state
 Key: KAFKA-16607
 URL: https://issues.apache.org/jira/browse/KAFKA-16607
 Project: Kafka
  Issue Type: Sub-task
  Components: kraft
Reporter: José Armando García Sancio


KafkaRaftMetrics exposes a current-state metrics that needs to be updated to 
include the prospective state.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Jenkins build is unstable: Kafka » Kafka Branch Builder » 3.7 #144

2024-04-23 Thread Apache Jenkins Server
See 




Re: [VOTE] KIP-853: KRaft Controller Membership Changes

2024-04-23 Thread José Armando García Sancio
Hi all,

I am closing the voting. The KIP passed with:
Jun Rao +1 binding
Jason Gustafson +1 binding
José Armando García Sancio +1 binding

Thank you all,

On Mon, Apr 22, 2024 at 11:57 AM José Armando García Sancio
 wrote:
>
> I am going to close the vote tomorrow morning (PST).
>
> On Mon, Apr 22, 2024 at 10:06 AM José Armando García Sancio
>  wrote:
> >
> > +1 binding.
> >
> > On Mon, Apr 22, 2024 at 9:28 AM Jason Gustafson
> >  wrote:
> > >
> > > Thanks Jose. +1. Great KIP!
> > >
> > > On Fri, Mar 29, 2024 at 11:16 AM Jun Rao  
> > > wrote:
> > >
> > > > Hi, Jose,
> > > >
> > > > Thanks for the KIP. +1
> > > >
> > > > Jun
> > > >
> > > > On Fri, Mar 29, 2024 at 9:55 AM José Armando García Sancio
> > > >  wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > I would like to call a vote to adopt KIP-853.
> > > > >
> > > > > KIP: https://cwiki.apache.org/confluence/x/nyH1D
> > > > > Discussion thread:
> > > > > https://lists.apache.org/thread/6o3sjvcb8dx1ozqfpltb7p0w76b4nd46
> > > > >
> > > > > Thanks,
> > > > > --
> > > > > -José
> > > > >
> > > >
> >
> >
> >
> > --
> > -José
>
>
>
> --
> -José



-- 
-José


[jira] [Resolved] (KAFKA-16606) JBOD support in KRaft does not seem to be gated by the metadata version

2024-04-23 Thread Jakub Scholz (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jakub Scholz resolved KAFKA-16606.
--
Resolution: Not A Problem

> JBOD support in KRaft does not seem to be gated by the metadata version
> ---
>
> Key: KAFKA-16606
> URL: https://issues.apache.org/jira/browse/KAFKA-16606
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Jakub Scholz
>Priority: Major
>
> JBOD support in KRaft should be supported since Kafka 3.7.0. The Kafka 
> [source 
> code|https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java#L194-L195]
>  suggests that it is supported with the metadata version {{{}3.7-IV2{}}}. 
> However, it seems to be possible to run KRaft cluster with JBOD even with 
> older metadata versions such as {{{}3.6{}}}. For example, I have a cluster 
> using the {{3.6}} metadata version:
> {code:java}
> bin/kafka-features.sh --bootstrap-server localhost:9092 describe
> Feature: metadata.version       SupportedMinVersion: 3.0-IV1    
> SupportedMaxVersion: 3.7-IV4    FinalizedVersionLevel: 3.6-IV2  Epoch: 1375 
> {code}
> Yet a KRaft cluster with JBOD seems to run fine:
> {code:java}
> bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe
> Querying brokers for log directories information
> Received log directory information from brokers 2000,3000,1000
> 

[jira] [Created] (KAFKA-16606) JBOD support in KRaft does not seem to be gated by the metadata version

2024-04-23 Thread Jakub Scholz (Jira)
Jakub Scholz created KAFKA-16606:


 Summary: JBOD support in KRaft does not seem to be gated by the 
metadata version
 Key: KAFKA-16606
 URL: https://issues.apache.org/jira/browse/KAFKA-16606
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.7.0
Reporter: Jakub Scholz


JBOD support in KRaft should be supported since Kafka 3.7.0. The Kafka [source 
code|https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java#L194-L195]
 suggests that it is supported with the metadata version {{{}3.7-IV2{}}}. 
However, it seems to be possible to run KRaft cluster with JBOD even with older 
metadata versions such as {{{}3.6{}}}. For example, I have a cluster using the 
{{3.6}} metadata version:
{code:java}
bin/kafka-features.sh --bootstrap-server localhost:9092 describe
Feature: metadata.version       SupportedMinVersion: 3.0-IV1    
SupportedMaxVersion: 3.7-IV4    FinalizedVersionLevel: 3.6-IV2  Epoch: 1375 
{code}
Yet a KRaft cluster with JBOD seems to run fine:
{code:java}
bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe
Querying brokers for log directories information
Received log directory information from brokers 2000,3000,1000

[jira] [Created] (KAFKA-16605) Fix the flaky LogCleanerParameterizedIntegrationTest

2024-04-23 Thread Kamal Chandraprakash (Jira)
Kamal Chandraprakash created KAFKA-16605:


 Summary: Fix the flaky LogCleanerParameterizedIntegrationTest
 Key: KAFKA-16605
 URL: https://issues.apache.org/jira/browse/KAFKA-16605
 Project: Kafka
  Issue Type: Task
Reporter: Kamal Chandraprakash
Assignee: Kamal Chandraprakash






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16604) Deprecate ConfigDef.ConfigKey constructor from public APIs

2024-04-23 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-16604:
-

 Summary: Deprecate ConfigDef.ConfigKey constructor from public APIs
 Key: KAFKA-16604
 URL: https://issues.apache.org/jira/browse/KAFKA-16604
 Project: Kafka
  Issue Type: Improvement
Reporter: Sagar Rao


Currently, one can create ConfigKey by either invoking the public constructor 
directly and passing it to a ConfigDef object or by invoking the a bunch of 
define methods. The 2 ways can get confusing at times. Moreover, it could lead 
to errors as was noticed in KAFKA-16592

We should ideally have only 1 way exposed to the users which IMO should be to 
create the objects only through the exposed define methods. This ticket is 
about marking the public constructor of ConfigKey as Deprecated first and then 
making it private eventually.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-932: Queues for Kafka

2024-04-23 Thread Andrew Schofield
Hi Jun,
Thanks for the reply.

123. Every time the GC fails over, it needs to recompute the assignment
for every member. However, the impact of re-assignment is not that onerous.
If the recomputed assignments are the same, which they may well be, there
is no impact on the members at all.

On receiving the new assignment, the member adjusts the topic-partitions
in its share sessions, removing those which were revoked and adding those
which were assigned. It is able to acknowledge the records it fetched from
the partitions which have just been revoked, and it doesn’t need to confirm
the assignment back to the GC.

125. I don’t think the GC needs to write ShareGroupPartitionMetadata
when processing AlterShareGroupOffsets. This is because the operation
happens as a result of an explicit administrative action and it is possible
to return a specific error code for each topic-partition. The cases where
ShareGroupPartitionMetadata is used are when a topic is added or removed
from the subscribed topics, or the number of partitions changes.

130. I suppose that limits the minimum lock timeout for a cluster to prevent
a group from having an excessively low value. Config added.

131. I have changed it to group.share.partition.max.record.locks.

136.  When GC failover occurs, the GC gaining ownership of a partition of
the __consumer_offsets topic replays the records to build its state.
In the case of a share group, it learns:

* The share group and its group epoch (ShareGroupMetadata)
* The list of members (ShareGroupMemberMetadata)
* The list of share-partitions (ShareGroupPartitionMetadata)

It will recompute the assignments in order to respond to
ShareGroupHeartbeat requests. As a result, it bumps the group epoch.

I will update the KIP accordingly to confirm the behaviour.

137.1: The GC and the SPL report the metrics in the group-coordinator-metrics
group. Unlike consumer groups in which the GC performs offset commit,
the share group equivalent is performed by the SPL. So, I’ve grouped the
concepts which relate to the group in group-coordinator-metrics.

The SC reports the metrics in the share-coordinator-metrics group.

137.2: There is one metric in both groups - partition-load-time. In the SC 
group,
it refers to the time loading data from the share-group state topic so that
a ReadShareGroupState request can be answered. In the GC group,
it refers to the time to read the state from the persister. Apart from the
interbroker RPC latency of the read, they’re likely to be very close.

Later, for a cluster which is using a custom persister, the share-coordinator
metrics would likely not be reported, and the persister would have its own
metrics.

137.3: Correct. Fixed.

137.4: Yes, it does include the time to write to the internal topic.
I’ve tweaked the description.

Thanks,
Andrew

> On 22 Apr 2024, at 20:04, Jun Rao  wrote:
> 
> Hi, Andrew,
> 
> Thanks for the reply.
> 
> 123. "The share group does not persist the target assignment."
>  What's the impact of this? Everytime that GC fails over, it needs to
> recompute the assignment for every member. Do we expect the member
> assignment to change on every GC failover?
> 
> 125. Should the GC also write ShareGroupPartitionMetadata?
> 
> 127. So, group epoch is only propagated to SC when
> InitializeShareGroupState request is sent. This sounds good.
> 
> 130. Should we have a group.share.min.record.lock.duration.ms to pair with
> group.share.max.record.lock.duration.ms?
> 
> 131. Sounds good. The name group.share.record.lock.partition.limit doesn't
> seem very intuitive. How about something
> like group.share.partition.max.records.pending.ack?
> 
> 136. Could we describe the process of GC failover? I guess it needs to
> compute member reassignment and check if there is any new topic/partition
> matching the share subscription. Does it bump up the group epoch?
> 
> 137. Metrics:
> 137.1 It would be useful to document who reports each metric. Is it any
> broker, GC, SC or SPL?
> 137.2 partition-load-time: Is that the loading time at SPL or SC?
> 137.3 "The time taken in milliseconds to load the share-group state from
> the share-group state partitions loaded in the last 30 seconds."
>  The window depends on metrics.num.samples and metrics.sample.window.ms
> and is not always 30 seconds, right?
> 137.4 Could you explain write/write-latency a bit more? Does it include the
> time to write to the internal topic?
> 
> Jun
> 
> On Mon, Apr 22, 2024 at 2:57 AM Andrew Schofield 
> wrote:
> 
>> Hi Jun,
>> Thanks for your comments.
>> 
>> 120. Thanks. Fixed.
>> 
>> 121. ShareUpdateValue.SnapshotEpoch indicates which snapshot
>> the update applies to. It should of course be the snapshot that precedes
>> it in the log. It’s just there to provide a consistency check.
>> 
>> I also noticed that ShareSnapshotValue was missing StateEpoch. It
>> isn’t any more.
>> 
>> 122. In KIP-848, ConsumerGroupMemberMetadataValue includes
>> GroupEpoch, but in the code it does not. In fact, there is 

Re: [DISCUSS] KIP-1038: Add Custom Error Handler to Producer

2024-04-23 Thread Alieh Saeedi
Thanks Matthias. I changed it to `custom.exception.handler`

Alieh


On Tue, Apr 23, 2024 at 8:47 AM Matthias J. Sax  wrote:

> Thanks Alieh!
>
> A few nits:
>
>
> 1) The new config we add for the producer should be mentioned in the
> "Public Interfaces" section.
>
> 2) Why do we use `producer.` prefix for a *producer* config? Should it
> be `exception.handler` only?
>
>
> -Matthias
>
> On 4/22/24 7:38 AM, Alieh Saeedi wrote:
> > Thank you all for the feedback!
> >
> > Addressing the main concern: The KIP is about giving the user the ability
> > to handle producer exceptions, but to be more conservative and avoid
> future
> > issues, we decided to be limited to a short list of exceptions. I
> included
> > *RecordTooLargeExceptin* and *UnknownTopicOrPartitionException. *Open to
> > suggestion for adding some more ;-)
> >
> > KIP Updates:
> > - clarified the way that the user should configure the Producer to use
> the
> > custom handler. I think adding a producer config property is the cleanest
> > one.
> > - changed the *ClientExceptionHandler* to *ProducerExceptionHandler* to
> be
> > closer to what we are changing.
> > - added the ProducerRecord as the input parameter of the handle() method
> as
> > well.
> > - increased the response types to 3 to have fail and two types of
> continue.
> > - The default behaviour is having no custom handler, having the
> > corresponding config parameter set to null. Therefore, the KIP provides
> no
> > default implementation of the interface.
> > - We follow the interface solution as described in the
> > Rejected Alternetives section.
> >
> >
> > Cheers,
> > Alieh
> >
> >
> > On Thu, Apr 18, 2024 at 8:11 PM Matthias J. Sax 
> wrote:
> >
> >> Thanks for the KIP Alieh! It addresses an important case for error
> >> handling.
> >>
> >> I agree that using this handler would be an expert API, as mentioned by
> >> a few people. But I don't think it would be a reason to not add it. It's
> >> always a tricky tradeoff what to expose to users and to avoid foot guns,
> >> but we added similar handlers to Kafka Streams, and have good experience
> >> with it. Hence, I understand, but don't share the concern raised.
> >>
> >> I also agree that there is some responsibility by the user to understand
> >> how such a handler should be implemented to not drop data by accident.
> >> But it seem unavoidable and acceptable.
> >>
> >> While I understand that a "simpler / reduced" API (eg via configs) might
> >> also work, I personally prefer a full handler. Configs have the same
> >> issue that they could be miss-used potentially leading to incorrectly
> >> dropped data, but at the same time are less flexible (and thus maybe
> >> ever harder to use correctly...?). Base on my experience, there is also
> >> often weird corner case for which it make sense to also drop records for
> >> other exceptions, and a full handler has the advantage of full
> >> flexibility and "absolute power!".
> >>
> >> To be fair: I don't know the exact code paths of the producer in
> >> details, so please keep me honest. But my understanding is, that the KIP
> >> aims to allow users to react to internal exception, and decide to keep
> >> retrying internally, swallow the error and drop the record, or raise the
> >> error?
> >>
> >> Maybe the KIP would need to be a little bit more precises what error we
> >> want to cover -- I don't think this list must be exhaustive, as we can
> >> always do follow up KIP to also apply the handler to other errors to
> >> expand the scope of the handler. The KIP does mention examples, but it
> >> might be good to explicitly state for what cases the handler gets
> applied?
> >>
> >> I am also not sure if CONTINUE and FAIL are enough options? Don't we
> >> need three options? Or would `CONTINUE` have different meaning depending
> >> on the type of error? Ie, for a retryable error `CONTINUE` would mean
> >> keep retrying internally, but for a non-retryable error `CONTINUE` means
> >> swallow the error and drop the record? This semantic overload seems
> >> tricky to reason about by users, so it might better to split `CONTINUE`
> >> into two cases -> `RETRY` and `SWALLOW` (or some better names).
> >>
> >> Additionally, should we just ship a `DefaultClientExceptionHandler`
> >> which would return `FAIL`, for backward compatibility. Or don't have any
> >> default handler to begin with and allow it to be `null`? I don't see the
> >> need for a specific `TransactionExceptionHandler`. To me, the goal
> >> should be to not modify the default behavior at all, but to just allow
> >> users to change the default behavior if there is a need.
> >>
> >> What is missing on the KIP though it, how the handler is passed into the
> >> producer thought? Would we need a new config which allows to set a
> >> custom handler? And/or would we allow to pass in an instance via the
> >> constructor or add a new method to set a handler?
> >>
> >>
> >> -Matthias
> >>
> >> On 4/18/24 10:02 AM, Andrew Schofield wrote:
> >>> 

Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-23 Thread Damien Gasparina
Hi Matthias,

Thanks for your reply!

100) That's a good point, I struggle to think of a case where it makes
sense, but I am sure it exists. Let's keep this one for a future KIP
as I think it would not be trivial to implement & tests.

101) As mentioned in KIP-1034, the error handler metadata could be
held longer due to the production exception handler, thus I think it
makes sense to have an independent container class here; I think
ErrorHandlerContext sounds like a good idea, it is clear that it is
only used for error handler and should not be reused / expanded in
other locations.

102) I included TaskId as it is currently available in the
ProcessorContext, it is more for backward compatibility reasons. The
only metadata added by TaskId is the subtopology and the topologyName
(a future metadata?). I think it makes sense to keep it.

103) Oh, good point, I didn't realize it. Let me update the KIP to
ensure immutability here.

104) Good point, now that you mention it, it makes sense to have it in
the Impl class. I guess we could simply cast the value in the default
implementation.

105) I just wanted to make it explicit that this method should not be
implemented and is deprecated. I specified a default for this method
for users to have the possibility to not add this method while
implementing the interface. The end goal is to delete the previous
signature at some point.

106) It was mostly for backward compatibility, if we just rely on a
single "handle" method for both, wouldn't we risk breaking backward
compatibility for existing implementations of the
ProductionExceptionHandler? Thinking about it, as we introduce a new
method, maybe we could play with the default implementation to
dispatch to previous handle/handleSerialization according to the kind
of exception. I will think about unifying, I do agree it would be
cleaner.

107) Same reason as before, to ensure backward compatibility, and to
remove the need to implement the deprecated method while implementing
the interface. The default implementation that throws a
NotImplementedException is just there to send a message :)

108) Good point, let me rename it.

109) We didn't really think about the TimestampExtractor interface
initially. Let me analyze it a bit more thoroughly. I think we would
need to catch potential exceptions in one of the handlers, but not
sure which one makes the most sense.

Cheers,
Damien

On Tue, 23 Apr 2024 at 06:15, Matthias J. Sax  wrote:
>
> Thanks for all the updates. Great discussion.
>
> Few follow up questions from my side:
>
> 99: I agree with Damien about Bruno's point (2). We should not return
> `Record` (cf point 103 below why).
>
>
> 100: Can we imagine a case, for which the `ProcessingExceptionHandler`
> would want/need to have access to a Processor's state stores? When the
> handler is executed, we are technically still in the right context, and
> could maybe allow accessing the state store. Just a wild idea; also
> don't want to enlarge the scope unnecessarily, and we might be able to
> do this in a follow up KIP, too, if we believe it would be useful. I
> though I just throw out the idea for completeness.
>
>
> 101: Does the name `ErrorHandlerContext` align to what Sophie had in
> mind about using this interface somewhere else?
>
>
> 102 `ErrorHandlerContext`: Is it intentional to have both `partition()`
> and `taskId()` -- the `TaskId` encodes the partition implicitly, so it's
> kinda redundant to also have `partition()`. Don't feel strongly about
> it, but might be worth to call out in the KIP why both are added.
>
>
> 103 `ErrorHandlerContext#header`: the return type is `Headers` which
> does not ensure immutability. I believe we need to introduce new
> `ReadOnlyHeaders` (or maybe some better name) interface...
>
>
> 104 `ErrorHandlerContext#convertToProcessorContext()`: I understand why
> this method was added, but I don't think that's the right approach to
> handle this case. We should not add this leaky abstraction IMHO, but
> instead add this method to a `DefaultImpl` class, and add a cast into
> the implementation from the interface to the class to access it. (Not
> 100% sure about the details how to setup the code, so it would be great
> to get a POC PR up to see how we can do this w/o the need to add this
> method to the interface.)
>
>
> 105 `ProductionExceptionHandler`: why does the existing method get a
> default implementation that throws an exception? Would be good to
> clarify in the KIP why this change in necessary in this way. -- Could we
> also let it `return FAIL` instead?
>
>
> 106 `ProductionExceptionHandler`: why do we add two new methods? IIRC,
> we added `handleSerializationException(...)` only because we could not
> re-use the existing `handle(...)` method (cf KIP-399:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-399%3A+Extend+ProductionExceptionHandler+to+cover+serialization+exceptions).
> I think it's sufficient to only add a single new method which
> "blends/unifies" both 

Re: [DISCUSS] KIP-1036: Extend RecordDeserializationException exception

2024-04-23 Thread Frédérik Rouleau
Hi Andrew,

A1. I will change the order of arguments to match.
A2 and A3, Yes the KIP is not updated yet as I do not have a wiki account.
So I must rely on some help to do those changes, which add some delay. I
will try to find someone available ASAP.
A4. I had the same thought. Using keyBuffer and valueBuffer for the
constructor seems fine for me. If no one disagrees, I will do that change.

Thanks,
Fred


Re: [DISCUSS] KIP-1036: Extend RecordDeserializationException exception

2024-04-23 Thread Andrew Schofield
Hi Fred,
Just reviewing the KIP again now that the discussion has quietened down a 
little.
It will soon be ready for a vote I think. I have a few comments about details.

A1) The signature of the new constructor for RecordDeserializationException
needs to be updated accord to the discussion. I see that you have a PR which
is much closer to what I expected. Personally, I think the arguments for the 
constructor
which represent the portions of the record should match the order for the 
constructor
of ConsumerRecord. We’ve already worked out the order of these things once so I
would go for consistency. I suggest:

public RecordDeserializationException(
TopicPartition partition,
long offset,
long timestamp,
TimestampType timestampType,
ByteBuffer key,
ByteBuffer value,
Headers headers,
String message,
Throwable cause);

A2) There are still references to the Record class in the KIP, but we decided 
not
to use it.

A3) There is also a reference to a getConsumerRecord() method which is now to
be replaced by individual methods for the portions of the record, such as:
byte[] value();

The KIP should have a complete and accurate description of the Java interface
changes so please fill in the details.

A4) Given that the key and value are provided to the constructor as ByteBuffer 
but
lazily converted into byte[] as required, I wonder whether the names of the 
methods
and the constructor arguments should be slightly different, just a keyBuffer 
for the
constructure and key() for the getter. Maybe you prefer to keep them the same 
and
I’m happy with that. Just offering a suggestion.


Thanks for the KIP. I think it’s a worthwhile improvement and I expect it’s 
almost there.

Thanks,
Andrew



> On 19 Apr 2024, at 18:59, Frédérik Rouleau  
> wrote:
>
> Hi everyone,
>
> Thanks for all that valuable feedback.
> So we have a consensus not to use Record.
>
> I have updated to PR by creating 2 childs classes
> KeyDeserializationException and ValueDeserializationException. Those
> classes directly embed the required fields. I do not think a wrapper object
> would be useful in that case.
> I still had to update checkstyle as Headers class is not allowed for import
> in the Errors package. I do not think it's an issue to add that
> authorization as Headers is already used in consumerRecord, so already
> public.
>
> The proposed PR https://github.com/apache/kafka/pull/15691/files
>
> If it's ok I will update the KIP.
>
> Regards,
> Fred



Re: [DISCUSS] KIP-1038: Add Custom Error Handler to Producer

2024-04-23 Thread Matthias J. Sax

Thanks Alieh!

A few nits:


1) The new config we add for the producer should be mentioned in the 
"Public Interfaces" section.


2) Why do we use `producer.` prefix for a *producer* config? Should it 
be `exception.handler` only?



-Matthias

On 4/22/24 7:38 AM, Alieh Saeedi wrote:

Thank you all for the feedback!

Addressing the main concern: The KIP is about giving the user the ability
to handle producer exceptions, but to be more conservative and avoid future
issues, we decided to be limited to a short list of exceptions. I included
*RecordTooLargeExceptin* and *UnknownTopicOrPartitionException. *Open to
suggestion for adding some more ;-)

KIP Updates:
- clarified the way that the user should configure the Producer to use the
custom handler. I think adding a producer config property is the cleanest
one.
- changed the *ClientExceptionHandler* to *ProducerExceptionHandler* to be
closer to what we are changing.
- added the ProducerRecord as the input parameter of the handle() method as
well.
- increased the response types to 3 to have fail and two types of continue.
- The default behaviour is having no custom handler, having the
corresponding config parameter set to null. Therefore, the KIP provides no
default implementation of the interface.
- We follow the interface solution as described in the
Rejected Alternetives section.


Cheers,
Alieh


On Thu, Apr 18, 2024 at 8:11 PM Matthias J. Sax  wrote:


Thanks for the KIP Alieh! It addresses an important case for error
handling.

I agree that using this handler would be an expert API, as mentioned by
a few people. But I don't think it would be a reason to not add it. It's
always a tricky tradeoff what to expose to users and to avoid foot guns,
but we added similar handlers to Kafka Streams, and have good experience
with it. Hence, I understand, but don't share the concern raised.

I also agree that there is some responsibility by the user to understand
how such a handler should be implemented to not drop data by accident.
But it seem unavoidable and acceptable.

While I understand that a "simpler / reduced" API (eg via configs) might
also work, I personally prefer a full handler. Configs have the same
issue that they could be miss-used potentially leading to incorrectly
dropped data, but at the same time are less flexible (and thus maybe
ever harder to use correctly...?). Base on my experience, there is also
often weird corner case for which it make sense to also drop records for
other exceptions, and a full handler has the advantage of full
flexibility and "absolute power!".

To be fair: I don't know the exact code paths of the producer in
details, so please keep me honest. But my understanding is, that the KIP
aims to allow users to react to internal exception, and decide to keep
retrying internally, swallow the error and drop the record, or raise the
error?

Maybe the KIP would need to be a little bit more precises what error we
want to cover -- I don't think this list must be exhaustive, as we can
always do follow up KIP to also apply the handler to other errors to
expand the scope of the handler. The KIP does mention examples, but it
might be good to explicitly state for what cases the handler gets applied?

I am also not sure if CONTINUE and FAIL are enough options? Don't we
need three options? Or would `CONTINUE` have different meaning depending
on the type of error? Ie, for a retryable error `CONTINUE` would mean
keep retrying internally, but for a non-retryable error `CONTINUE` means
swallow the error and drop the record? This semantic overload seems
tricky to reason about by users, so it might better to split `CONTINUE`
into two cases -> `RETRY` and `SWALLOW` (or some better names).

Additionally, should we just ship a `DefaultClientExceptionHandler`
which would return `FAIL`, for backward compatibility. Or don't have any
default handler to begin with and allow it to be `null`? I don't see the
need for a specific `TransactionExceptionHandler`. To me, the goal
should be to not modify the default behavior at all, but to just allow
users to change the default behavior if there is a need.

What is missing on the KIP though it, how the handler is passed into the
producer thought? Would we need a new config which allows to set a
custom handler? And/or would we allow to pass in an instance via the
constructor or add a new method to set a handler?


-Matthias

On 4/18/24 10:02 AM, Andrew Schofield wrote:

Hi Alieh,
Thanks for the KIP.

Exception handling in the Kafka producer and consumer is really not

ideal.

It’s even harder working out what’s going on with the consumer.

I’m a bit nervous about this KIP and I agree with Chris that it could do

with additional

motivation. This would be an expert-level interface given how complicated
the exception handling for Kafka has become.

7. The application is not really aware of the batching being done on its

behalf.

The ProduceResponse can actually return an array of records which failed
per batch. If 

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-23 Thread Matthias J. Sax
Thanks for splitting out this KIP. The discussion shows, that it is a 
complex beast by itself, so worth to discuss by its own.



Couple of question / comment:


100 `StateStore#commit()`: The JavaDoc says "must not be called by 
users" -- I would propose to put a guard in place for this, by either 
throwing an exception (preferable) or adding a no-op implementation (at 
least for our own stores, by wrapping them -- we cannot enforce it for 
custom stores I assume), and document this contract explicitly.



101 adding `.position` to the store: Why do we actually need this? The 
KIP says "To ensure consistency with the committed data and changelog 
offsets" but I am not sure if I can follow? Can you elaborate why 
leaving the `.position` file as-is won't work?



If it's possible at all, it will need to be done by
creating temporary StateManagers and StateStores during rebalance. I think
it is possible, and probably not too expensive, but the devil will be in
the detail.


This sounds like a significant overhead to me. We know that opening a 
single RocksDB takes about 500ms, and thus opening RocksDB to get this 
information might slow down rebalances significantly.



102: It's unclear to me, how `.position` information is added. The KIP 
only says: "position offsets will be stored in RocksDB, in the same 
column family as the changelog offsets". Do you intent to add this 
information to the map passed via `commit(final MapLong> changelogOffsets)`? The KIP should describe this in more detail. 
Also, if my assumption is correct, we might want to rename the parameter 
and also have a better JavaDoc description?



103: Should we make it mandatory (long-term) that all stores (including 
custom stores) manage their offsets internally? Maintaining both options 
and thus both code paths puts a burden on everyone and make the code 
messy. I would strongly prefer if we could have mid-term path to get rid 
of supporting both.  -- For this case, we should deprecate the newly 
added `managesOffsets()` method right away, to point out that we intend 
to remove it. If it's mandatory to maintain offsets for stores, we won't 
need this method any longer. In memory stores can just return null from 
#committedOffset().



104 "downgrading": I think it might be worth to add support for 
downgrading w/o the need to wipe stores? Leveraging `upgrade.from` 
parameter, we could build a two rolling bounce downgrade: (1) the new 
code is started with `upgrade.from` set to a lower version, telling the 
runtime to do the cleanup on `close()` -- (ie, ensure that all data is 
written into `.checkpoint` and `.position` file, and the newly added CL 
is deleted). In a second, rolling bounce, the old code would be able to 
open RocksDB. -- I understand that this implies much more work, but 
downgrade seems to be common enough, that it might be worth it? Even if 
we did not always support this in the past, we have the face the fact 
that KS is getting more and more adopted and as a more mature product 
should support this?





-Matthias







On 4/21/24 11:58 PM, Bruno Cadonna wrote:

Hi all,

How should we proceed here?

1. with the plain .checkpoint file
2. with a way to use the state store interface on unassigned but locally 
existing task state


While I like option 2, I think option 1 is less risky and will give us 
the benefits of transactional state stores sooner. We should consider 
the interface approach afterwards, though.



Best,
Bruno



On 4/17/24 3:15 PM, Bruno Cadonna wrote:

Hi Nick and Sophie,

I think the task ID is not enough to create a state store that can 
read the offsets of non-assigned tasks for lag computation during 
rebalancing. The state store also needs the state directory so that it 
knows where to find the information that it needs to return from 
changelogOffsets().


In general, I think we should proceed with the plain .checkpoint file 
for now and iterate back to the state store solution later since it 
seems it is not that straightforward. Alternatively, Nick could 
timebox an effort to better understand what would be needed for the 
state store solution. Nick, let us know your decision.


Regarding your question about the state store instance. I am not too 
familiar with that part of the code, but I think the state store is 
build when the processor topology is build and the processor topology 
is build per stream task. So there is one instance of processor 
topology and state store per stream task. Try to follow the call in [1].


Best,
Bruno

[1] 
https://github.com/apache/kafka/blob/f52575b17225828d2ff11996030ab7304667deab/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java#L153




On 4/16/24 8:59 PM, Nick Telford wrote:

That does make sense. The one thing I can't figure out is how per-Task
StateStore instances are constructed.

It looks like we construct one StateStore instance for the whole 
Topology
(in InternalTopologyBuilder), and pass that into