Jenkins build is unstable: Kafka » Kafka Branch Builder » trunk #2825

2024-04-18 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-16586) Test TaskAssignorConvergenceTest failing

2024-04-18 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16586:
---

 Summary: Test TaskAssignorConvergenceTest failing
 Key: KAFKA-16586
 URL: https://issues.apache.org/jira/browse/KAFKA-16586
 Project: Kafka
  Issue Type: Test
  Components: streams, unit tests
Reporter: Matthias J. Sax


{code:java}
java.lang.AssertionError: Assertion failed in randomized test. Reproduce with: 
`runRandomizedScenario(-538095696758490522)`.at 
org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.runRandomizedScenario(TaskAssignorConvergenceTest.java:545)
  at 
org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.randomClusterPerturbationsShouldConverge(TaskAssignorConvergenceTest.java:479){code}
This might expose an actual bug (or incorrect test setup) and should be 
reproducible (die not try it myself yet).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16280) Expose method to determine Metric Measurability

2024-04-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16280.
-
Resolution: Done

> Expose method to determine Metric Measurability
> ---
>
> Key: KAFKA-16280
> URL: https://issues.apache.org/jira/browse/KAFKA-16280
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Affects Versions: 3.8.0
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
>  Labels: kip
> Fix For: 3.8.0
>
>
> The Jira is to track the development of KIP-1019: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1019%3A+Expose+method+to+determine+Metric+Measurability]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #2824

2024-04-18 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 369635 lines...]
[2024-04-19T00:09:45.150Z] 
[2024-04-19T00:09:45.150Z] Gradle Test Run :streams:test > Gradle Test Executor 
30 > SynchronizedPartitionGroupTest > testUpdatePartitions() STARTED
[2024-04-19T00:09:45.150Z] 
[2024-04-19T00:09:45.150Z] Gradle Test Run :streams:test > Gradle Test Executor 
30 > SynchronizedPartitionGroupTest > testUpdatePartitions() PASSED
[2024-04-19T00:09:45.150Z] 
[2024-04-19T00:09:45.150Z] Gradle Test Run :streams:test > Gradle Test Executor 
30 > RackAwareGraphConstructorFactoryTest > shouldReturnMinCostConstructor() 
STARTED
[2024-04-19T00:09:45.150Z] 
[2024-04-19T00:09:45.150Z] Gradle Test Run :streams:test > Gradle Test Executor 
30 > RackAwareGraphConstructorFactoryTest > shouldReturnMinCostConstructor() 
PASSED
[2024-04-19T00:09:45.150Z] 
[2024-04-19T00:09:45.150Z] Gradle Test Run :streams:test > Gradle Test Executor 
30 > RackAwareGraphConstructorFactoryTest > 
shouldReturnBalanceSubtopologyConstructor() STARTED
[2024-04-19T00:09:45.150Z] 
[2024-04-19T00:09:45.150Z] Gradle Test Run :streams:test > Gradle Test Executor 
30 > RackAwareGraphConstructorFactoryTest > 
shouldReturnBalanceSubtopologyConstructor() PASSED
[2024-04-19T00:09:45.150Z] 
[2024-04-19T00:09:45.150Z] Gradle Test Run :streams:test > Gradle Test Executor 
30 > DefaultTaskExecutorTest > shouldSetUncaughtStreamsException() STARTED
[2024-04-19T00:09:46.161Z] 
[2024-04-19T00:09:46.161Z] Gradle Test Run :streams:test > Gradle Test Executor 
30 > DefaultTaskExecutorTest > shouldSetUncaughtStreamsException() PASSED
[2024-04-19T00:09:46.161Z] 
[2024-04-19T00:09:46.161Z] Gradle Test Run :streams:test > Gradle Test Executor 
30 > DefaultTaskExecutorTest > shouldClearTaskTimeoutOnProcessed() STARTED
[2024-04-19T00:09:46.161Z] 
[2024-04-19T00:09:46.161Z] Gradle Test Run :streams:test > Gradle Test Executor 
30 > DefaultTaskExecutorTest > shouldClearTaskTimeoutOnProcessed() PASSED
[2024-04-19T00:09:46.161Z] 
[2024-04-19T00:09:46.161Z] Gradle Test Run :streams:test > Gradle Test Executor 
30 > DefaultTaskExecutorTest > shouldUnassignTaskWhenRequired() STARTED
[2024-04-19T00:09:46.161Z] 
[2024-04-19T00:09:46.161Z] Gradle Test Run :streams:test > Gradle Test Executor 
30 > DefaultTaskExecutorTest > shouldUnassignTaskWhenRequired() PASSED
[2024-04-19T00:09:46.161Z] 
[2024-04-19T00:09:46.161Z] Gradle Test Run :streams:test > Gradle Test Executor 
30 > DefaultTaskExecutorTest > shouldClearTaskReleaseFutureOnShutdown() STARTED
[2024-04-19T00:09:46.161Z] 
[2024-04-19T00:09:46.161Z] Gradle Test Run :streams:test > Gradle Test Executor 
30 > DefaultTaskExecutorTest > shouldClearTaskReleaseFutureOnShutdown() PASSED
[2024-04-19T00:09:46.161Z] 
[2024-04-19T00:09:46.161Z] Gradle Test Run :streams:test > Gradle Test Executor 
30 > DefaultTaskExecutorTest > shouldProcessTasks() STARTED
[2024-04-19T00:09:46.161Z] 
[2024-04-19T00:09:46.161Z] Gradle Test Run :streams:test > Gradle Test Executor 
30 > DefaultTaskExecutorTest > shouldProcessTasks() PASSED
[2024-04-19T00:09:46.161Z] 
[2024-04-19T00:09:46.161Z] Gradle Test Run :streams:test > Gradle Test Executor 
30 > DefaultTaskExecutorTest > shouldPunctuateStreamTime() STARTED
[2024-04-19T00:09:46.161Z] 
[2024-04-19T00:09:46.161Z] Gradle Test Run :streams:test > Gradle Test Executor 
30 > DefaultTaskExecutorTest > shouldPunctuateStreamTime() PASSED
[2024-04-19T00:09:46.161Z] 
[2024-04-19T00:09:46.161Z] Gradle Test Run :streams:test > Gradle Test Executor 
30 > DefaultTaskExecutorTest > shouldShutdownTaskExecutor() STARTED
[2024-04-19T00:09:46.161Z] 
[2024-04-19T00:09:46.161Z] Gradle Test Run :streams:test > Gradle Test Executor 
30 > DefaultTaskExecutorTest > shouldShutdownTaskExecutor() PASSED
[2024-04-19T00:09:46.161Z] 
[2024-04-19T00:09:46.161Z] Gradle Test Run :streams:test > Gradle Test Executor 
30 > DefaultTaskExecutorTest > shouldAwaitProcessableTasksIfNoneAssignable() 
STARTED
[2024-04-19T00:09:46.161Z] 
[2024-04-19T00:09:46.161Z] Gradle Test Run :streams:test > Gradle Test Executor 
30 > DefaultTaskExecutorTest > shouldAwaitProcessableTasksIfNoneAssignable() 
PASSED
[2024-04-19T00:09:46.161Z] 
[2024-04-19T00:09:46.161Z] Gradle Test Run :streams:test > Gradle Test Executor 
30 > DefaultTaskExecutorTest > 
shouldRespectPunctuationDisabledByTaskExecutionMetadata() STARTED
[2024-04-19T00:09:46.161Z] 
[2024-04-19T00:09:46.161Z] Gradle Test Run :streams:test > Gradle Test Executor 
30 > DefaultTaskExecutorTest > 
shouldRespectPunctuationDisabledByTaskExecutionMetadata() PASSED
[2024-04-19T00:09:46.161Z] 
[2024-04-19T00:09:46.161Z] Gradle Test Run :streams:test > Gradle Test Executor 
30 > DefaultTaskExecutorTest > shouldSetTaskTimeoutOnTimeoutException() STARTED
[2024-04-19T00:09:46.161Z] 
[2024-04-19T00:09:46.161Z] Gradle Test Run :streams:test > Gradle Test Executor 
30 > DefaultTaskExecutorTest > 

Re: [DISCUSS] KIP-1036: Extend RecordDeserializationException exception

2024-04-18 Thread Sophie Blee-Goldman
Thanks for all the context everyone. I think in light of all the
information
that's been shared, I would agree with David that neither the Record
nor ConsumerRecord feels appropriate and that we should just create
a new class/interface that holds the info we want to expose. We
definitely don't want to add each item as a separate parameter since
that will make evolution of this API difficult to do without being a hassle
for users (ie migrating off deprecated APIs and/or blowing up the API
surface area).

I also like the idea of using this to indicate what/where the actual
exception occurred. Alternatively, we could literally extend the
RecordDeserializationException exception class and have a separate
exception type to indicate whether the failure occurred when deserializing
the key or value. Not necessarily saying this is better than just adding
info to the container class/interface, just listing all the options. My
impression is that we tend to favor presenting info in exceptions by
extending the exception type itself rather than adding data to the
exception class. I'm not sure I even agree with that pattern, but that's
been my observation so far.

On Thu, Apr 18, 2024 at 10:47 AM David Arthur
 wrote:

> Hi Fred, thanks for the KIP. Seems like a useful improvement.
>
> As others have mentioned, I think we should avoid exposing Record in this
> way.
>
> Using ConsumerRecord seems okay, but maybe not the best fit for this case
> (for the reasons Matthias gave).
>
> Maybe we could create a new container interface to hold the partially
> deserialized data? This could also indicate to the exception handler
> whether the key, the value, or both had deserialization errors.
>
> Thanks,
> David
>
> On Thu, Apr 18, 2024 at 10:16 AM Frédérik Rouleau
>  wrote:
>
> > Hi,
> >
> > But I guess my main question is really about what metadata we really
> > > want to add to `RecordDeserializationException`? `Record` expose all
> > > kind of internal (serialization) metadata like `keySize()`,
> > > `valueSize()` and many more. For the DLQ use-case it seems we don't
> > > really want any of these? So I am wondering if just adding
> > > key/value/ts/headers would be sufficient?
> > >
> >
> > I think that key/value/ts/headers, topicPartition and offset are all we
> > need. I do not see any usage for other metadata. If someone has a use
> case,
> > I would like to know it.
> >
> > So in that case we can directly add the data into the exception. We can
> > keep ByteBuffer for the local field instead of byte[], that will avoid
> > memory allocation if users do not require it.
> > I wonder if we should return the ByteBuffer or directly the byte[] (or
> both
> > ?) which is more convenient for end users. Any thoughts?
> > Then we can have something like:
> >
> > public RecordDeserializationException(TopicPartition partition,
> >  long offset,
> >  ByteBuffer key,
> >  ByteBuffer value,
> >  Header[] headers,
> >  long timestamp,
> >  String message,
> >  Throwable cause);
> >
> > public TopicPartition topicPartition();
> >
> > public long offset();
> >
> > public long timestamp();
> >
> > public byte[] key(); // Will allocate the array on call
> >
> > public byte[] value(); // Will allocate the array on call
> >
> > public Header[] headers();
> >
> >
> >
> > Regards,
> > Fred
> >
>
>
> --
> -David
>


Re: [DISCUSS] KIP-1038: Add Custom Error Handler to Producer

2024-04-18 Thread Matthias J. Sax

Thanks for the KIP Alieh! It addresses an important case for error handling.

I agree that using this handler would be an expert API, as mentioned by 
a few people. But I don't think it would be a reason to not add it. It's 
always a tricky tradeoff what to expose to users and to avoid foot guns, 
but we added similar handlers to Kafka Streams, and have good experience 
with it. Hence, I understand, but don't share the concern raised.


I also agree that there is some responsibility by the user to understand 
how such a handler should be implemented to not drop data by accident. 
But it seem unavoidable and acceptable.


While I understand that a "simpler / reduced" API (eg via configs) might 
also work, I personally prefer a full handler. Configs have the same 
issue that they could be miss-used potentially leading to incorrectly 
dropped data, but at the same time are less flexible (and thus maybe 
ever harder to use correctly...?). Base on my experience, there is also 
often weird corner case for which it make sense to also drop records for 
other exceptions, and a full handler has the advantage of full 
flexibility and "absolute power!".


To be fair: I don't know the exact code paths of the producer in 
details, so please keep me honest. But my understanding is, that the KIP 
aims to allow users to react to internal exception, and decide to keep 
retrying internally, swallow the error and drop the record, or raise the 
error?


Maybe the KIP would need to be a little bit more precises what error we 
want to cover -- I don't think this list must be exhaustive, as we can 
always do follow up KIP to also apply the handler to other errors to 
expand the scope of the handler. The KIP does mention examples, but it 
might be good to explicitly state for what cases the handler gets applied?


I am also not sure if CONTINUE and FAIL are enough options? Don't we 
need three options? Or would `CONTINUE` have different meaning depending 
on the type of error? Ie, for a retryable error `CONTINUE` would mean 
keep retrying internally, but for a non-retryable error `CONTINUE` means 
swallow the error and drop the record? This semantic overload seems 
tricky to reason about by users, so it might better to split `CONTINUE` 
into two cases -> `RETRY` and `SWALLOW` (or some better names).


Additionally, should we just ship a `DefaultClientExceptionHandler` 
which would return `FAIL`, for backward compatibility. Or don't have any 
default handler to begin with and allow it to be `null`? I don't see the 
need for a specific `TransactionExceptionHandler`. To me, the goal 
should be to not modify the default behavior at all, but to just allow 
users to change the default behavior if there is a need.


What is missing on the KIP though it, how the handler is passed into the 
producer thought? Would we need a new config which allows to set a 
custom handler? And/or would we allow to pass in an instance via the 
constructor or add a new method to set a handler?



-Matthias

On 4/18/24 10:02 AM, Andrew Schofield wrote:

Hi Alieh,
Thanks for the KIP.

Exception handling in the Kafka producer and consumer is really not ideal.
It’s even harder working out what’s going on with the consumer.

I’m a bit nervous about this KIP and I agree with Chris that it could do with 
additional
motivation. This would be an expert-level interface given how complicated
the exception handling for Kafka has become.

7. The application is not really aware of the batching being done on its behalf.
The ProduceResponse can actually return an array of records which failed
per batch. If you get RecordTooLargeException, and want to retry, you probably
need to remove the offending records from the batch and retry it. This is 
getting fiddly.

8. There is already o.a.k.clients.producer.Callback. I wonder whether an
alternative might be to add a method to the existing Callback interface, such 
as:

   ClientExceptionResponse onException(Exception exception)

It would be called when a ProduceResponse contains an error, but the
producer is going to retry. It tells the producer whether to go ahead with the 
retry
or not. The default implementation would be to CONTINUE, because that’s
just continuing to retry as planned. Note that this is a per-record callback, so
the application would be able to understand which records failed.

By using an existing interface, we already know how to configure it and we know
about the threading model for calling it.


Thanks,
Andrew




On 17 Apr 2024, at 18:17, Chris Egerton  wrote:

Hi Alieh,

Thanks for the KIP! The issue with writing to non-existent topics is
particularly frustrating for users of Kafka Connect and has been the source
of a handful of Jira tickets over the years. My thoughts:

1. An additional detail we can add to the motivation (or possibly rejected
alternatives) section is that this kind of custom retry logic can't be
implemented by hand by, e.g., setting retries to 0 in the producer config
and handling 

Re: [DISCUSS] KIP-1036: Extend RecordDeserializationException exception

2024-04-18 Thread David Arthur
Hi Fred, thanks for the KIP. Seems like a useful improvement.

As others have mentioned, I think we should avoid exposing Record in this
way.

Using ConsumerRecord seems okay, but maybe not the best fit for this case
(for the reasons Matthias gave).

Maybe we could create a new container interface to hold the partially
deserialized data? This could also indicate to the exception handler
whether the key, the value, or both had deserialization errors.

Thanks,
David

On Thu, Apr 18, 2024 at 10:16 AM Frédérik Rouleau
 wrote:

> Hi,
>
> But I guess my main question is really about what metadata we really
> > want to add to `RecordDeserializationException`? `Record` expose all
> > kind of internal (serialization) metadata like `keySize()`,
> > `valueSize()` and many more. For the DLQ use-case it seems we don't
> > really want any of these? So I am wondering if just adding
> > key/value/ts/headers would be sufficient?
> >
>
> I think that key/value/ts/headers, topicPartition and offset are all we
> need. I do not see any usage for other metadata. If someone has a use case,
> I would like to know it.
>
> So in that case we can directly add the data into the exception. We can
> keep ByteBuffer for the local field instead of byte[], that will avoid
> memory allocation if users do not require it.
> I wonder if we should return the ByteBuffer or directly the byte[] (or both
> ?) which is more convenient for end users. Any thoughts?
> Then we can have something like:
>
> public RecordDeserializationException(TopicPartition partition,
>  long offset,
>  ByteBuffer key,
>  ByteBuffer value,
>  Header[] headers,
>  long timestamp,
>  String message,
>  Throwable cause);
>
> public TopicPartition topicPartition();
>
> public long offset();
>
> public long timestamp();
>
> public byte[] key(); // Will allocate the array on call
>
> public byte[] value(); // Will allocate the array on call
>
> public Header[] headers();
>
>
>
> Regards,
> Fred
>


-- 
-David


[jira] [Resolved] (KAFKA-16389) consumer_test.py’s test_valid_assignment fails with new consumer

2024-04-18 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-16389.

Resolution: Fixed

> consumer_test.py’s test_valid_assignment fails with new consumer
> 
>
> Key: KAFKA-16389
> URL: https://issues.apache.org/jira/browse/KAFKA-16389
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
> Attachments: KAFKA-16389.patch, consumer.log
>
>
> The following error is reported when running the {{test_valid_assignment}} 
> test from {{consumer_test.py}}:
>  {code}
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
> data = self.run_test()
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 246, in run_test
> return self.test_context.function(self.test)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
> 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line 
> 584, in test_valid_assignment
> wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, 
> consumer.current_assignment()),
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 
> 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: expected valid assignments of 6 partitions when 
> num_started 2: [('ducker@ducker05', []), ('ducker@ducker06', [])]
> {code}
> To reproduce, create a system test suite file named 
> {{test_valid_assignment.yml}} with these contents:
> {code:yaml}
> failures:
>   - 
> 'kafkatest/tests/client/consumer_test.py::AssignmentValidationTest.test_valid_assignment@{"metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer","group_remote_assignor":"range"}'
> {code}
> Then set the the {{TC_PATHS}} environment variable to include that test suite 
> file.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1036: Extend RecordDeserializationException exception

2024-04-18 Thread Matthias J. Sax
Andrew, thanks for the details about Consumer internals. That's super 
useful for this discussion! -- And it confirms my understanding.


I don't think we want to use ConsumerRecord type thought, 
because for a DLQ the handler wants to write the message into some DLQ 
topic, and thus needs the key and value, so only 
`ConsumerRecord` would work (or maybe `ByteBuffer>`).


While I would be ok with using `ConsumerRecord`, I don't see a huge 
advantage compared to passing in all fields we are interested in 
one-by-one. In the end, if the data is written into a DLQ topic, the 
`ConsumerRecord` object cannot be reused (but the handler will build a 
`ProducerRecord`), and `ConsumerRecord` would "just" be a container -- I 
don't think it would simplify user-code or provide any other benefit, 
but just add an (unnecessary?) level wrapping/indirection?


The only advantage I would see, is for the case that new interesting 
metadata fields get added to the message format -- for this case, using 
`ConsumerRecord` would automatically include these new fields, and we 
don't need to modify the exception class to add them explicitly. But as 
this happens very rarely, it does not seem to provide a huge benefit.


In the end, I would be fine either way. Curious to hear what others think.


-Matthias



On 4/18/24 8:41 AM, Andrew Schofield wrote:

Hi,
Thanks for the KIP. I think it’s an interesting idea and it seems to work 
nicely with how
the clients work today.

Recently, I’ve been digging in to the record deserialization code in the 
consumer as
part of implementing KIP-932. It’s pretty nasty in there.

There are essentially two kinds of problems that can be encountered when
converting the response from a Fetch RPC into ConsumerRecords.

1) The batch might be corrupt, so you can’t even work out what records are in 
there.
2) Individual records cannot be deserialized.

In the second case, the consumer accumulates records from a Fetch response until
it hits a record which it cannot deserialize. If it has any parsed records 
already to return
to the application, it parks the RecordDeserializationException and returns the 
records
so far. Then, on the next poll, because there are no parsed records waiting, it 
throws
the RecordDeserializationException. And so it continues until all fetched 
records are
processed.

I don’t really think of org.apache.kafka.common.record.Record as an external 
interface.
I think it’s a bit close to the wire to be putting on a user-facing interface.

We do know almost everything about the bad record at the point where the
deserialization fails. I wonder whether actually we could use
ConsumerRecord or even ConsumerRecord :)
in the constructor for the RecordDeserializationException. I don’t really like 
having
a long list of each of the individual items of the record’s parts like 
timestamp and so on.
It’s nicer to have an interface to the record that we can evolve without having 
to change
the constructor for this exception.

Thanks,
Andrew


On 18 Apr 2024, at 15:13, Frédérik Rouleau  
wrote:

Hi,

But I guess my main question is really about what metadata we really

want to add to `RecordDeserializationException`? `Record` expose all
kind of internal (serialization) metadata like `keySize()`,
`valueSize()` and many more. For the DLQ use-case it seems we don't
really want any of these? So I am wondering if just adding
key/value/ts/headers would be sufficient?



I think that key/value/ts/headers, topicPartition and offset are all we
need. I do not see any usage for other metadata. If someone has a use case,
I would like to know it.

So in that case we can directly add the data into the exception. We can
keep ByteBuffer for the local field instead of byte[], that will avoid
memory allocation if users do not require it.
I wonder if we should return the ByteBuffer or directly the byte[] (or both
?) which is more convenient for end users. Any thoughts?
Then we can have something like:

public RecordDeserializationException(TopicPartition partition,
 long offset,
 ByteBuffer key,
 ByteBuffer value,
 Header[] headers,
 long timestamp,
 String message,
 Throwable cause);

public TopicPartition topicPartition();

public long offset();

public long timestamp();

public byte[] key(); // Will allocate the array on call

public byte[] value(); // Will allocate the array on call

public Header[] headers();



Regards,
Fred




Re: [DISCUSS] KIP-1038: Add Custom Error Handler to Producer

2024-04-18 Thread Andrew Schofield
Hi Alieh,
Thanks for the KIP.

Exception handling in the Kafka producer and consumer is really not ideal.
It’s even harder working out what’s going on with the consumer.

I’m a bit nervous about this KIP and I agree with Chris that it could do with 
additional
motivation. This would be an expert-level interface given how complicated
the exception handling for Kafka has become.

7. The application is not really aware of the batching being done on its behalf.
The ProduceResponse can actually return an array of records which failed
per batch. If you get RecordTooLargeException, and want to retry, you probably
need to remove the offending records from the batch and retry it. This is 
getting fiddly.

8. There is already o.a.k.clients.producer.Callback. I wonder whether an
alternative might be to add a method to the existing Callback interface, such 
as:

  ClientExceptionResponse onException(Exception exception)

It would be called when a ProduceResponse contains an error, but the
producer is going to retry. It tells the producer whether to go ahead with the 
retry
or not. The default implementation would be to CONTINUE, because that’s
just continuing to retry as planned. Note that this is a per-record callback, so
the application would be able to understand which records failed.

By using an existing interface, we already know how to configure it and we know
about the threading model for calling it.


Thanks,
Andrew



> On 17 Apr 2024, at 18:17, Chris Egerton  wrote:
>
> Hi Alieh,
>
> Thanks for the KIP! The issue with writing to non-existent topics is
> particularly frustrating for users of Kafka Connect and has been the source
> of a handful of Jira tickets over the years. My thoughts:
>
> 1. An additional detail we can add to the motivation (or possibly rejected
> alternatives) section is that this kind of custom retry logic can't be
> implemented by hand by, e.g., setting retries to 0 in the producer config
> and handling exceptions at the application level. Or rather, it can, but 1)
> it's a bit painful to have to reimplement at every call-site for
> Producer::send (and any code that awaits the returned Future) and 2) it's
> impossible to do this without losing idempotency on retries.
>
> 2. That said, I wonder if a pluggable interface is really the right call
> here. Documenting the interactions of a producer with
> a ClientExceptionHandler instance will be tricky, and implementing them
> will also be a fair amount of work. I believe that there needs to be some
> more granularity for how writes to non-existent topics (or really,
> UNKNOWN_TOPIC_OR_PARTITION and related errors from the broker) are handled,
> but I'm torn between keeping it simple with maybe one or two new producer
> config properties, or a full-blown pluggable interface. If there are more
> cases that would benefit from a pluggable interface, it would be nice to
> identify these and add them to the KIP to strengthen the motivation. Right
> now, I'm not sure the two that are mentioned in the motivation are
> sufficient.
>
> 3. Alternatively, a possible compromise is for this KIP to introduce new
> properties that dictate how to handle unknown-topic-partition and
> record-too-large errors, with the thinking that if we introduce a pluggable
> interface later on, these properties will be recognized by the default
> implementation of that interface but could be completely ignored or
> replaced by alternative implementations.
>
> 4. (Nit) You can remove the "This page is meant as a template for writing a
> KIP..." part from the KIP. It's not a template anymore :)
>
> 5. If we do go the pluggable interface route, wouldn't we want to add the
> possibility for retry logic? The simplest version of this could be to add a
> RETRY value to the ClientExceptionHandlerResponse enum.
>
> 6. I think "SKIP" or "DROP" might be clearer instead of "CONTINUE" for
> the ClientExceptionHandlerResponse enum, since they cause records to be
> dropped.
>
> Cheers,
>
> Chris
>
> On Wed, Apr 17, 2024 at 12:25 PM Justine Olshan
>  wrote:
>
>> Hey Alieh,
>>
>> I echo what Omnia says, I'm not sure I understand the implications of the
>> change and I think more detail is needed.
>>
>> This comment also confused me a bit:
>> * {@code ClientExceptionHandler} that continues the transaction even if a
>> record is too large.
>> * Otherwise, it makes the transaction to fail.
>>
>> Relatedly, I've been working with some folks on a KIP for transactions
>> errors and how they are handled. Specifically for the
>> RecordTooLargeException (and a few other errors), we want to give a new
>> error category for this error that allows the application to choose how it
>> is handled. Maybe this KIP is something that you are looking for? Stay
>> tuned :)
>>
>> Justine
>>
>>
>>
>>
>>
>> On Wed, Apr 17, 2024 at 8:03 AM Omnia Ibrahim 
>> wrote:
>>
>>> Hi Alieh,
>>> Thanks for the KIP! I have couple of comments
>>> - You mentioned in the KIP motivation,
 Another example for which a 

Re: [ANNOUNCE] New Kafka PMC Member: Greg Harris

2024-04-18 Thread Justine Olshan
Congratulations Greg!

On Thu, Apr 18, 2024 at 12:03 AM Matthias J. Sax  wrote:

> Congrats Greg!
>
> On 4/15/24 10:44 AM, Hector Geraldino (BLOOMBERG/ 919 3RD A) wrote:
> > Congrats! Well deserved
> >
> > From: dev@kafka.apache.org At: 04/13/24 14:42:22 UTC-4:00To:
> dev@kafka.apache.org
> > Subject: [ANNOUNCE] New Kafka PMC Member: Greg Harris
> >
> > Hi all,
> >
> > Greg Harris has been a Kafka committer since July 2023. He has remained
> > very active and instructive in the community since becoming a committer.
> > It's my pleasure to announce that Greg is now a member of Kafka PMC.
> >
> > Congratulations, Greg!
> >
> > Chris, on behalf of the Apache Kafka PMC
> >
> >
>


Re: [DISCUSS] KIP-1036: Extend RecordDeserializationException exception

2024-04-18 Thread Andrew Schofield
Hi,
Thanks for the KIP. I think it’s an interesting idea and it seems to work 
nicely with how
the clients work today.

Recently, I’ve been digging in to the record deserialization code in the 
consumer as
part of implementing KIP-932. It’s pretty nasty in there.

There are essentially two kinds of problems that can be encountered when
converting the response from a Fetch RPC into ConsumerRecords.

1) The batch might be corrupt, so you can’t even work out what records are in 
there.
2) Individual records cannot be deserialized.

In the second case, the consumer accumulates records from a Fetch response until
it hits a record which it cannot deserialize. If it has any parsed records 
already to return
to the application, it parks the RecordDeserializationException and returns the 
records
so far. Then, on the next poll, because there are no parsed records waiting, it 
throws
the RecordDeserializationException. And so it continues until all fetched 
records are
processed.

I don’t really think of org.apache.kafka.common.record.Record as an external 
interface.
I think it’s a bit close to the wire to be putting on a user-facing interface.

We do know almost everything about the bad record at the point where the
deserialization fails. I wonder whether actually we could use
ConsumerRecord or even ConsumerRecord :)
in the constructor for the RecordDeserializationException. I don’t really like 
having
a long list of each of the individual items of the record’s parts like 
timestamp and so on.
It’s nicer to have an interface to the record that we can evolve without having 
to change
the constructor for this exception.

Thanks,
Andrew

> On 18 Apr 2024, at 15:13, Frédérik Rouleau  
> wrote:
> 
> Hi,
> 
> But I guess my main question is really about what metadata we really
>> want to add to `RecordDeserializationException`? `Record` expose all
>> kind of internal (serialization) metadata like `keySize()`,
>> `valueSize()` and many more. For the DLQ use-case it seems we don't
>> really want any of these? So I am wondering if just adding
>> key/value/ts/headers would be sufficient?
>> 
> 
> I think that key/value/ts/headers, topicPartition and offset are all we
> need. I do not see any usage for other metadata. If someone has a use case,
> I would like to know it.
> 
> So in that case we can directly add the data into the exception. We can
> keep ByteBuffer for the local field instead of byte[], that will avoid
> memory allocation if users do not require it.
> I wonder if we should return the ByteBuffer or directly the byte[] (or both
> ?) which is more convenient for end users. Any thoughts?
> Then we can have something like:
> 
> public RecordDeserializationException(TopicPartition partition,
> long offset,
> ByteBuffer key,
> ByteBuffer value,
> Header[] headers,
> long timestamp,
> String message,
> Throwable cause);
> 
> public TopicPartition topicPartition();
> 
> public long offset();
> 
> public long timestamp();
> 
> public byte[] key(); // Will allocate the array on call
> 
> public byte[] value(); // Will allocate the array on call
> 
> public Header[] headers();
> 
> 
> 
> Regards,
> Fred



Re: [DISCUSS] KIP-1036: Extend RecordDeserializationException exception

2024-04-18 Thread Frédérik Rouleau
Hi,

But I guess my main question is really about what metadata we really
> want to add to `RecordDeserializationException`? `Record` expose all
> kind of internal (serialization) metadata like `keySize()`,
> `valueSize()` and many more. For the DLQ use-case it seems we don't
> really want any of these? So I am wondering if just adding
> key/value/ts/headers would be sufficient?
>

I think that key/value/ts/headers, topicPartition and offset are all we
need. I do not see any usage for other metadata. If someone has a use case,
I would like to know it.

So in that case we can directly add the data into the exception. We can
keep ByteBuffer for the local field instead of byte[], that will avoid
memory allocation if users do not require it.
I wonder if we should return the ByteBuffer or directly the byte[] (or both
?) which is more convenient for end users. Any thoughts?
Then we can have something like:

public RecordDeserializationException(TopicPartition partition,
 long offset,
 ByteBuffer key,
 ByteBuffer value,
 Header[] headers,
 long timestamp,
 String message,
 Throwable cause);

public TopicPartition topicPartition();

public long offset();

public long timestamp();

public byte[] key(); // Will allocate the array on call

public byte[] value(); // Will allocate the array on call

public Header[] headers();



Regards,
Fred


[jira] [Created] (KAFKA-16585) No way to forward message from punctuation method in the FixedKeyProcessor

2024-04-18 Thread Stanislav Spiridonov (Jira)
Stanislav Spiridonov created KAFKA-16585:


 Summary: No way to forward message from punctuation method in the 
FixedKeyProcessor
 Key: KAFKA-16585
 URL: https://issues.apache.org/jira/browse/KAFKA-16585
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.6.2
Reporter: Stanislav Spiridonov


The FixedKeyProcessorContext can forward only FixedKeyRecord. This class 
doesn't have a public constructor and can be created based on existing records. 
But such record usually is absent in the punctuation method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-18 Thread Bruno Cadonna

Hi Sophie,

Thanks for the clarifications!

(1)
What about replacing Node* with KafkaStreams* or StreamsClient*? I 
prefer KafkaStreams* since that class represents the Kafka Streams 
client. I am also fine with KafkaStreamsClient*. I really would like to 
avoid introducing a new term in Kafka Streams for which we already have 
an equivalent term even if it is used on the brokers since that is a 
different level of abstraction. Additionally, I have never been a big 
fan of the term "instance".


(4)
I think the question is if we need to retrieve assignment metadata by 
task for a Kafka client or if it is enough to iterate over the assigned 
tasks. Could you explain why we cannot add additional metadata to the 
class AssignedTask?
The interface KafkaStreamsAssignment (a.k.a. NodeAssignment ;-) ) could 
be something like


public interface NodeAssignment {
ProcessID processId();

Instant followupRebalanceDeadline();

Set assignment();

enum AssignedTaskType {
STATELESS,
STATEFUL,
STANDBY
}

static class AssignedTask {
AssignedTaskType type();
TaskId id();

... other metadata needed in future
}
}
If we need to retrieve assigned task by task ID, maybe it is better to 
add methods like assignedFor(TaskId) and not to expose the Map.


(5)
I am in favor of ApplicationState but I am also fine ApplicationMetadata 
if you insist.


(6)
Is

void finalAssignment(GroupAssignment assignment, GroupSubscription 
subscription);


kind of a callback? If yes, would it make sense to call it 
onAssignmentComputed()?



(7)
What do you think of changing the TaskAssignmentUtils signatures to

public static TaskAssignment default*Assignment(final ApplicationState 
applicationState, final TaskAssignment taskAssignment, ...) {...}


to avoid to mutate the assignment in place?


Best,
Bruno

On 4/17/24 7:50 PM, Sophie Blee-Goldman wrote:

Thanks Bruno! I can provide a bit of context behind some of these
decisions but I just want to say up front that I agree with every single one
of your points, though I am going to push back a bit on the first one.

[1] The idea here is to help avoid some confusion around the overloaded
term "client", which can mean either "an instance of Kafka Streams" or
"a consumer/producer client". The problem is that the former applies to
the entire Streams process and therefore should be interpreted as "all
of the StreamThread on an instance" whereas the latter is typically used
interchangeably to mean the consumer client in the consumer group,
which implies a scope of just a single StreamThread on an instance.
The "Node" name here was an attempt to clear this up, since differentiating
between instance and thread level is critical to understanding and properly
implementing the custom assignor.

I do see what you mean about there not being a concept of Node in the
Kafka Streams codebase, and that we usually do use "instance" when we
need to differentiate between consumer client/one StreamThread and
Kafka Streams client/all StreamThreads. As I'm typing this I'm convincing
myself even more that we shouldn't just use "Client" without further
distinction, but I'm not sure "Node" has to be the answer either.

Could we replace "Node" with "KafkaStreamsClient" or is that too wordy?
I honestly do still like Node personally, and don't see what's wrong with
introducing a new term since the "node" terminology is used heavily
on the broker side and it means effectively the same thing in theory.
But if we can't compromise between "Node" and "Client" then maybe
we can settle on "Instance"? (Does feel a bit wordy too...maybe "Process"?)

[2] Good catch(es). Makes sense to me

[3] Totally agree, a single enum makes way more sense

[4] Here again I can provide some background -- this is actually following
a pattern that we used when refactoring the old PartitionAssignor into
the new (at the time) Consumer PartitionAssignor interface. The idea was
to wrap the return type to protect the assign method in case we ever wanted
to add something to what was returned, such as metadata for the entire
group. This way we could avoid a massively disruptive deprecation-and-
migration cycle for everyone who implements a custom assignor.
That said, I just checked the GroupAssignment class we added for this
in the ConsumerPartitionAssignor interface, and to this day we've never
added anything other that the map of consumer client to assignment.

So maybe that was overly cautious. I'd be ok with flattening this map out.
I guess the question is just, can we imagine any case in which we might
want the custom assignor to return additional metadata? To be honest
I think this might be more likely than with the plain consumer client case,
but again, I'm totally fine with just flattening it to a plain map return
type

[5] I guess not. I think ApplicationMetadata was added during the initial
KIP discussion so that's probably why it doesn't follow the same naming
pattern. 

Re: [DISCUSS] KIP-1036: Extend RecordDeserializationException exception

2024-04-18 Thread Matthias J. Sax

Hi,

I am actually not sure if using `Record` is really the right thing? 
While `Record` is technically public API, it does not seem to be 
intended to be exposed to end users?


But I guess my main question is really about what metadata we really 
want to add to `RecordDeserializationException`? `Record` expose all 
kind of internal (serialization) metadata like `keySize()`, 
`valueSize()` and many more. For the DLQ use-case it seems we don't 
really want any of these? So I am wondering if just adding 
key/value/ts/headers would be sufficient?


The motivation section of the KIP is somewhat spare about DLQ details, 
so it's hard to judge what is needed / useful and would would be a leaky 
abstraction?


About "when we cannot construct a `ConsumerRecord` -- I was not really 
digging into it until know, and was just following Kirks comment 
blindly. But looking into the code, we would only not be able to 
construct a `CosumerRecord` when either key or value deserialization 
fails? But as we would pass in byte[] type it would not matter. -- Kirk 
did also mention a corrupted batch, but it seems for this case we might 
not even hit the deserialization code path, but would error out earlier?


I was also looking into the build setup, and I think the idea of the 
import control is to have some sanity check about import dependencies. I 
currently don't see why we should not add an allow rule for Record.


But if we decide to not pass in Record/ConsumerRecord both questions are 
void anyway. Of course, for this case, we would need to add a getter 
method for each metadata field we add (but I think that would be totally 
ok?)


I also seen know, that the old constructor is deprecated, and thus, I 
think using `Optional` a return type is not required (already reflected 
on the wiki page).


Bottom line seems to be: the motivation about what metadata is needed 
for the DLQ use-case is not described in much detail and thus it's hard 
to judge what the right design might be?


The wiki account thing is unfortunately nothing we can fix on our side. 
We did file a ticket with INFRA team, but need to wait for them to 
address it... In the meantime, if you can provide the missing 
information, and what you want to get edited, I can help to update the 
wiki page accordingly.



-Matthias

On 4/16/24 11:18 AM, Sophie Blee-Goldman wrote:

Also ignore everything I said about Streams earlier. I didn't look closely
enough on my first pass over the KIP and thought this was changing the
DeserializationExceptionHandler in Streams. I see now that this is
actually about the consumer client's DeserializationException so everything
I said about using a ByteArray Deserialization and Kafka Streams doesn't
apply here. The important thing is just that it still deserializes one
record
at a time, and essentially throws this when it fails to convert the Record
type into a ConsumerRecord type. So there's always only one record
at a type to consider.

Sorry for any confusion I caused

On Tue, Apr 16, 2024 at 11:15 AM Sophie Blee-Goldman 
wrote:


Ah, thanks for the additional context. I should have looked at the code
before I opened my mouth (so to speak)

In that case, I fully agree that using Record instead of ConsumerRecord
makes sense. It does indeed seem like by definition, if there is a
DeserializationException then there is no ConsumerRecord since this
is where/how it gets thrown:

try {
...
return new ConsumerRecord<>(...);
} catch (RuntimeException e) {
...
throw new RecordDeserializationException(...);
}

As you mentioned the Record is an input to the method so we definitely have
one of those, and imo, it makes sense to use. As far as I can tell it's
just
a regular public interface so exposing it shouldn't be an issue just based
on
the class itself. But I'm still a bit concerned about the checkstyle
complaint.

I'll try to find someone who can explain why or if we should avoid
returning
a Record type here. Other than that, I'd say the KIP LGTM as-is and we
could kick off voting

On Tue, Apr 16, 2024 at 10:47 AM Frédérik Rouleau
 wrote:


Thanks Sophie,

I can write something in the KIP on how KStreams solves that issue, but as
I can't create a Wiki account, I will have to find someone to do this on
my
behalf (if someone can work on solving that wiki account creation, it
would
be great).

The biggest difference between Record and ConsumerRecord is that data are
stored respectively using ByteBuffer and Byte array.

For the Record option, the object already exists in the parsing method, so
it's roughly just a parameter type change in the Exception. The point is
just about exposing the Record class externally. By the way, the name
Record is also making some IDE a bit crazy by confusing it with the new
Java Record feature. An alternative could be to create another wrapper
type
of just include key and value ByteBuffer in the
RecordDeserializationException itself.

For the ConsumerRecord option, it requires to allocate Byte arrays, even

[jira] [Created] (KAFKA-16584) Make log processing summary configurable or debug

2024-04-18 Thread Andras Hatvani (Jira)
Andras Hatvani created KAFKA-16584:
--

 Summary: Make log processing summary configurable or debug
 Key: KAFKA-16584
 URL: https://issues.apache.org/jira/browse/KAFKA-16584
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.6.2
Reporter: Andras Hatvani


Currently *every two minutes for every stream thread* statistics will be logged 
on INFO log level. 
{code:log}
2024-04-18T09:18:23.790+02:00  INFO 33178 --- [service] [-StreamThread-1] 
o.a.k.s.p.internals.StreamThread         : stream-thread 
[service-149405a3-c7e3-4505-8bbd-c3bff226b115-StreamThread-1] Processed 0 total 
records, ran 0 punctuators, and committed 0 total tasks since the last update 
{code}

This is absolutely unnecessary and even harmful since it fills the logs and 
thus storage space with unwanted and useless data. Otherwise the INFO logs are 
useful and helpful, therefore it's not an option to raise the log level to WARN.
Please make the logProcessingSummary 
* either to a DEBUG level log or
* make it configurable so that it can be disabled.
This is the relevant code: 
https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1073



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [ANNOUNCE] New Kafka PMC Member: Greg Harris

2024-04-18 Thread Matthias J. Sax

Congrats Greg!

On 4/15/24 10:44 AM, Hector Geraldino (BLOOMBERG/ 919 3RD A) wrote:

Congrats! Well deserved

From: dev@kafka.apache.org At: 04/13/24 14:42:22 UTC-4:00To:  
dev@kafka.apache.org
Subject: [ANNOUNCE] New Kafka PMC Member: Greg Harris

Hi all,

Greg Harris has been a Kafka committer since July 2023. He has remained
very active and instructive in the community since becoming a committer.
It's my pleasure to announce that Greg is now a member of Kafka PMC.

Congratulations, Greg!

Chris, on behalf of the Apache Kafka PMC




[jira] [Created] (KAFKA-16583) Update from 3.4.0 to 3.7.0 image write failed in Kraft mode

2024-04-18 Thread HanXu (Jira)
HanXu created KAFKA-16583:
-

 Summary: Update from 3.4.0 to 3.7.0 image write failed in Kraft 
mode
 Key: KAFKA-16583
 URL: https://issues.apache.org/jira/browse/KAFKA-16583
 Project: Kafka
  Issue Type: Bug
  Components: kraft
Affects Versions: 3.7.0
Reporter: HanXu
Assignee: HanXu


How to reproduce:
1. Launch a 3.4.0 controller and a 3.4.0 broker(BrokerA) in Kraft mode;
2. Create a topic with 1 partition;
3. Launch a 3.4.0 broker(Broker B) in Kraft mode and reassign the step 2 
partition to Broker B;
4. Upgrade Broker B to 3.7.0;

The Broker B will keep log the following error:
{code:java}
[2024-04-18 14:46:54,144] ERROR Encountered metadata loading fault: Unhandled 
error initializing new publishers 
(org.apache.kafka.server.fault.LoggingFaultHandler)
org.apache.kafka.image.writer.UnwritableMetadataException: Metadata has been 
lost because the following could not be represented in metadata version 
3.4-IV0: the directory assignment state of one or more replicas
at 
org.apache.kafka.image.writer.ImageWriterOptions.handleLoss(ImageWriterOptions.java:94)
at 
org.apache.kafka.metadata.PartitionRegistration.toRecord(PartitionRegistration.java:391)
at org.apache.kafka.image.TopicImage.write(TopicImage.java:71)
at org.apache.kafka.image.TopicsImage.write(TopicsImage.java:84)
at org.apache.kafka.image.MetadataImage.write(MetadataImage.java:155)
at 
org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:295)
at 
org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:266)
at 
org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
at java.base/java.lang.Thread.run(Thread.java:840)
{code}

Bug:
- When reassigning partition, PartitionRegistration#merge will set the new 
replicas with UNASSIGNED directory;
- But in metadata version 3.4.0 PartitionRegistration#toRecord only allows 
MIGRATING directory;
{code:java}
if (options.metadataVersion().isDirectoryAssignmentSupported()) {
record.setDirectories(Uuid.toList(directories));
} else {
for (Uuid directory : directories) {
if (!DirectoryId.MIGRATING.equals(directory)) {
options.handleLoss("the directory assignment state of one 
or more replicas");
break;
}
}
}
{code}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)