Re: Creating kafka wiki id

2024-05-09 Thread Matthias J. Sax
Self-service to create an account is currently not working. Please reply 
on https://issues.apache.org/jira/browse/INFRA-25451 to request a wiki 
account.


I'll update the wiki page for now until the issue is resolved.

-Matthias

On 5/7/24 8:25 AM, 黃竣陽 wrote:

Hello, I want to create a KIP, but I don't have Kafka wiki id. I go to the
page (https://cwiki.apache.org/confluence/signup.action) but it doesn't
have a button to regist an account
Please help me to create an account, Thank you



Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-05-09 Thread Matthias J. Sax

Thanks Sophie! Makes it much clearer where you are coming from.

About the Type unsafety: isn't this also an issue for the 
`handleSerialziationException` case, because the handler is used for all 
sink topics, and thus key/value types are not really know w/o taking the 
sink topic into account? -- So I am not sure if having two handler 
methods really helps much with regard to type safety?


Just want to make this small comment for completeness. Let's hear what 
others think. Given that we both don't have a strong opinion but just a 
personal preference, we should be able to come to a conclusion quickly 
and get this KIP approved for 3.8 :)



-Matthias

On 5/8/24 3:12 PM, Sophie Blee-Goldman wrote:

Well I definitely don't feel super strongly about it, and more importantly,
I'm not a user. So I will happily defer to the preference of anyone who
will actually be using this feature. But  I'll explain my reasoning:

There *is* a relevant distinction between these two callbacks -- because
the passed-in record will have a different type depending on whether it was
a serialization exception or something else. Even if we combined them into
a single #handle method, users will still end up implementing two distinct
branches depending on whether it was a serialization exception or not,
since that determines the type of the ProducerRecord passed in.

Not to mention they'll need to cast it to a ProducerRecord
when we could have just passed it in as this type via a dedicated callback.
And note that because of the generics, they can't do an instanceof check to
make sure that the record type is ProducerRecord and will
have to suppress the "unchecked cast" warning.

So if we combined the two callbacks, their handler will look something like
this:

@SuppressWarnings("unchecked")
public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext
context,
final ProducerRecord record,
final Exception exception) {
if (exception instanceof SerializationException) {
if (exception.origin().equals(KEY)) {
log.error("Failed to serialize key", exception);
} else {
log.error("Failed to serialize value", exception);
}

} else {
final ProducerRecord serializedRecord = (ProducerRecord) record;
log.error("Failed to produce record with serialized key={} and serialized
value={}",
serializedRecord.key(), serializedRecord.value());
}
return ProductionExceptionHandlerResponse.FAIL;
}

That seems like the most basic case, and it still haswith distinct logic
even if they ultimately handle exceptions the same way. And looking forward
to KIP-1034: Dead-letter queues, it seems all the more likely that the
actual handling response might be different depending on whether it's a
serialization exception or not: a serialized record can probably be
retried/sent to a DLQ, whereas a record that can't be serialized should not
(can't, really) be forwarded to a DLQ. So if they're going to have
completely different implementations depending on whether it's a
serialization exception, why not just give them two separate callbacks?

And that's all assuming the user is perfectly aware of the different
exception types and their implications for the type of the ProducerRecord.
Many people might just miss the existence of the
RecordSerializationException altogether --
there are already so many different exception types, ESPECIALLY when it
comes to the Producer. Not to mention they'll need to understand the
nuances of how the ProducerRecord type changes depending on the type of
exception that's passed in. And on top of all that, they'll need to know
that there is metadata stored in the RecordSerializationException regarding
the origin of the error. Whereas if we just passed in the
SerializationExceptionOrigin to a #handlerSerialization callback, well,
that's pretty impossible to miss.

That all just seems like a lot for most people to have to understand to
implement a ProductionExceptionHandler, which imo is not at all an advanced
feature and should be as straightforward and easy to use as possible.

Lastly -- I don't think it's quite fair to compare this to the
RecordDeserializationException. We have a dedicated handler that's just for
deserialization exceptions specifically, hence there's no worry about users
having to be aware of the different exception types they might have to deal
with in the DeserializtionExceptionHandler. Whereas serialization
exceptions are just a subset of what might get passed in to the
ProductionExceptionHandler...

Just explaining my reasoning -- in the end I leave it up to the KIP authors
and anyone who will actually be using this feature in their applications :)



On Tue, May 7, 2024 at 8:35 PM Matthias J. Sax  wrote:


@Loic, yes, what you describe is exactly what I had in mind.



@Sophie, can you elaborate a little bit?


First of all, I agree that it makes sense to maintain the two separate
callbacks for the ProductionExceptionHandler, since one of them is
specifi

Re: [DISCUSS] KIP-1038: Add Custom Error Handler to Producer

2024-05-07 Thread Matthias J. Sax
e

issues, we decided to be limited to a short list of

exceptions.

I

included

*RecordTooLargeExceptin* and

*UnknownTopicOrPartitionException.

*Open

to

suggestion for adding some more ;-)

KIP Updates:
- clarified the way that the user should configure the

Producer

to

use

the

custom handler. I think adding a producer config property

is

the

cleanest

one.
- changed the *ClientExceptionHandler* to

*ProducerExceptionHandler*

to

be

closer to what we are changing.
- added the ProducerRecord as the input parameter of the

handle()

method

as

well.
- increased the response types to 3 to have fail and two

types

of

continue.

- The default behaviour is having no custom handler,

having

the

corresponding config parameter set to null. Therefore, the

KIP

provides

no

default implementation of the interface.
- We follow the interface solution as described in the
Rejected Alternetives section.


Cheers,
Alieh


On Thu, Apr 18, 2024 at 8:11 PM Matthias J. Sax <

mj...@apache.org



wrote:



Thanks for the KIP Alieh! It addresses an important case

for

error

handling.

I agree that using this handler would be an expert API,

as

mentioned

by

a few people. But I don't think it would be a reason to

not

add

it.

It's

always a tricky tradeoff what to expose to users and to

avoid

foot

guns,

but we added similar handlers to Kafka Streams, and have

good

experience

with it. Hence, I understand, but don't share the concern

raised.


I also agree that there is some responsibility by the

user

to

understand

how such a handler should be implemented to not drop data

by

accident.

But it seem unavoidable and acceptable.

While I understand that a "simpler / reduced" API (eg via

configs)

might

also work, I personally prefer a full handler. Configs

have

the

same

issue that they could be miss-used potentially leading to

incorrectly

dropped data, but at the same time are less flexible (and

thus

maybe

ever harder to use correctly...?). Base on my experience,

there

is

also

often weird corner case for which it make sense to also

drop

records

for

other exceptions, and a full handler has the advantage of

full

flexibility and "absolute power!".

To be fair: I don't know the exact code paths of the

producer

in

details, so please keep me honest. But my understanding

is,

that

the

KIP

aims to allow users to react to internal exception, and

decide

to

keep

retrying internally, swallow the error and drop the

record,

or

raise

the

error?

Maybe the KIP would need to be a little bit more precises

what

error

we

want to cover -- I don't think this list must be

exhaustive,

as

we

can

always do follow up KIP to also apply the handler to

other

errors

to

expand the scope of the handler. The KIP does mention

examples,

but

it

might be good to explicitly state for what cases the

handler

gets

applied?


I am also not sure if CONTINUE and FAIL are enough

options?

Don't

we

need three options? Or would `CONTINUE` have different

meaning

depending

on the type of error? Ie, for a retryable error

`CONTINUE`

would

mean

keep retrying internally, but for a non-retryable error

`CONTINUE`

means

swallow the error and drop the record? This semantic

overload

seems

tricky to reason about by users, so it might better to

split

`CONTINUE`

into two cases -> `RETRY` and `SWALLOW` (or some better

names).


Additionally, should we just ship a

`DefaultClientExceptionHandler`

which would return `FAIL`, for backward compatibility. Or

don't

have

any

default handler to begin with and allow it to be `null`?

I

don't

see

the

need for a specific `TransactionExceptionHandler`. To me,

the

goal

should be to not modify the default behavior at all, but

to

just

allow

users to change the default behavior if there is a need.

What is missing on the KIP though it, how the handler is

passed

into

the

producer thought? Would we need a new config which allows

to

set

a

custom handler? And/or would we allow to pass in an

instance

via

the

constructor or add a new method to set a handler?


-Matthias

On 4/18/24 10:02 AM, Andrew Schofield wrote:

Hi Alieh,
Thanks for the KIP.

Exception handling in the Kafka producer and consumer is

really

not

ideal.

It’s even harder working out what’s going on with the

consumer.


I’m a bit nervous about this KIP and I agree with Chris

that

it

could

do

with additional

motivation. This would be an expert-level interface

given

how

complicated

the exception handling for Kafka has become.

7. The application is not really aware of the batching

being

done

on

its

behalf.

The ProduceResponse can actually return an array of

records

which

failed

per batch. If you get RecordTooLargeException, and want

to

retry,

you

probably

need to remove the offending records from the batch and

retry

it.

This

is getting fiddly.


8. There is already o.a.k.clients.producer.Callback. I

wond

Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-05-07 Thread Matthias J. Sax

@Loic, yes, what you describe is exactly what I had in mind.



@Sophie, can you elaborate a little bit?


First of all, I agree that it makes sense to maintain the two separate
callbacks for the ProductionExceptionHandler, since one of them is
specifically for serialization exceptions while the other is used for
everything/anything else.


What makes a serialization exception special compare to other errors 
that it's valuable to treat it differently? Why can we put "everything 
else" into a single bucket? By your train of though, should we not split 
out the "everything else" bucket into a different callback method for 
every different error? If no, why not, but only for serialization errors?


From what I believe to remember, historically, we added the 
ProductionExceptionHandler, and kinda just missed the serialization 
error case. And later, when we extended the handler we just could not 
re-use the existing callback as it was typed with `` and 
it would have been an incompatible change; so it was rather a workaround 
to add the second method to then handler, but not really intended design?



It's of course only my personal opinion that I believe a single callback 
method is simpler/cleaner compared to sticking with two, and adding the 
new exception type to make it backward compatible seems worth it. It 
also kinda introduces the same patter we use elsewhere (cf KIP-1036) 
what I actually think is an argument for introducing 
`RercordSerializationExcetpion`, to unify user experience across the board.


Would be great to hear from others about this point. It's not that I 
strongly object to having two methods, and I would not block this KIP on 
this question.




-Matthias


On 5/7/24 3:40 PM, Sophie Blee-Goldman wrote:

First of all, I agree that it makes sense to maintain the two separate
callbacks for the ProductionExceptionHandler, since one of them is
specifically for serialization exceptions while the other is used for
everything/anything else. I also think we can take advantage of this fact
to simplify things a bit and cut down on the amount of new stuff added to
the API by just adding a parameter to the #handleSerializationException
callback and use that to pass in the SerializationExceptionOrigin enum to
distinguish between key vs value. This way we wouldn't need to introduce
yet another exception type (the RecordSerializationException) just to pass
in this information.

Thoughts?

On Tue, May 7, 2024 at 8:33 AM Loic Greffier 
wrote:


Hi Matthias,

To sum up with the ProductionExceptionHandler callback methods (106)
proposed changes.

A new method ProductionExceptionHandler#handle is added with the following
signature:


ProductionExceptionHandlerResponse handle(final ErrorHandlerContext

context, final ProducerRecord record, final Exception exception);

The ProducerRecord parameter has changed to accept a serialized or
non-serialized record.
Thus, the new ProductionExceptionHandler#handle method can handle both
production exception and serialization exception.

Both old ProductionExceptionHandler#handle and
ProductionExceptionHandler#handleSerializationException methods are now
deprecated.
The old ProductionExceptionHandler#handle method gets a default
implementation, so users do not have to implement a deprecated method.

To handle backward compatibility, the new
ProductionExceptionHandler#handle method gets a default implementation.


default ProductionExceptionHandlerResponse handle(final

ErrorHandlerContext context, final ProducerRecord record, final
Exception exception) {

   if (exception instanceof RecordSerializationException) {
   this.handleSerializationException(record, exception.getCause());
   }

   return handle((ProducerRecord) record, exception);
}


The default implementation either invokes #handleSerializationException or
#handle depending on the type of the exception, thus users still relying on
deprecated ProductionExceptionHandler#handle
or ProductionExceptionHandler#handleSerializationException custom
implementations won't break.

The new ProductionExceptionHandler#handle method is now invoked in case of
serialization exception:


public  void send(final String topic, final K key, final V value,

...) {

 try {
 keyBytes = keySerializer.serialize(topic, headers, key);
 ...
 } catch (final ClassCastException exception) {
   ...
 } catch (final Exception exception) {

 try {
 response = productionExceptionHandler.handle(context,

record, new RecordSerializationException(SerializationExceptionOrigin.KEY,
exception));

 } catch (final Exception e) {
 ...
 }
 }
}


To wrap the origin serialization exception and determine whether it comes
from the key or the value, a new exception class is created:


public class RecordSerializationException extends SerializationException

{

 public enum SerializationExceptionOrigin {
 KEY,
 VALUE
 }

 public 

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-05-03 Thread Matthias J. Sax
That's good questions... I could think of a few approaches, but I admit 
it might all be a little bit tricky to code up...


However if we don't solve this problem, I think this KIP does not really 
solve the core issue we are facing? In the end, if we rely on the 
`.checkpoint` file to compute a task assignment, but the `.checkpoint` 
file can be arbitrary stale after a crash because we only write it on a 
clean close, there would be still a huge gap that this KIP does not close?


For the case in which we keep the checkpoint file, this KIP would still 
help for "soft errors" in which KS can recover, and roll back the store. 
A significant win for sure. -- But hard crashes would still be an 
problem? We might assign tasks to "wrong" instance, ie, which are not 
most up to date, as the checkpoint information could be very outdated? 
Would we end up with a half-baked solution? Would this be good enough to 
justify the introduced complexity? In the, for soft failures it's still 
a win. Just want to make sure we understand the limitations and make an 
educated decision.


Or do I miss something?


-Matthias

On 5/3/24 10:20 AM, Bruno Cadonna wrote:

Hi Matthias,


200:
I like the idea in general. However, it is not clear to me how the 
behavior should be with multiple stream threads in the same Kafka 
Streams client. What stream thread opens which store? How can a stream 
thread pass an open store to another stream thread that got the 
corresponding task assigned? How does a stream thread know that a task 
was not assigned to any of the stream threads of the Kafka Streams 
client? I have the feeling we should just keep the .checkpoint file on 
close for now to unblock this KIP and try to find a solution to get 
totally rid of it later.



Best,
Bruno



On 5/3/24 6:29 PM, Matthias J. Sax wrote:
101: Yes, but what I am saying is, that we don't need to flush the 
.position file to disk periodically, but only maintain it in main 
memory, and only write it to disk on close() to preserve it across 
restarts. This way, it would never be ahead, but might only lag? But 
with my better understanding about (102) it might be mood anyway...



102: Thanks for clarifying. Looked into the code now. Makes sense. 
Might be something to be worth calling out explicitly in the KIP 
writeup. -- Now that I realize that the position is tracked inside the 
store (not outside as the changelog offsets) it makes much more sense 
to pull position into RocksDB itself. In the end, it's actually a 
"store implementation" detail how it tracks the position (and kinda 
leaky abstraction currently, that we re-use the checkpoint file 
mechanism to track it and flush to disk).



200: I was thinking about this a little bit more, and maybe it's not 
too bad? When KS starts up, we could upon all stores we find on local 
disk pro-actively, and keep them all open until the first rebalance 
finishes: For tasks we get assigned, we hand in the already opened 
store (this would amortize the cost to open the store before the 
rebalance) and for non-assigned tasks, we know the offset information 
won't change and we could just cache it in-memory for later reuse (ie, 
next rebalance) and close the store to free up resources? -- Assuming 
that we would get a large percentage of opened stores assigned as 
tasks anyway, this could work?



-Matthias

On 5/3/24 1:29 AM, Bruno Cadonna wrote:

Hi Matthias,


101:
Let's assume a RocksDB store, but I think the following might be true 
also for other store implementations. With this KIP, if Kafka Streams 
commits the offsets, the committed offsets will be stored in an 
in-memory data structure (i.e. the memtable) and stay there until 
RocksDB decides that it is time to persist its in-memory data 
structure. If Kafka Streams writes its position to the .position file 
during a commit and a crash happens before RocksDB persist the 
memtable then the position in the .position file is ahead of the 
persisted offset. If IQ is done between the crash and the state store 
fully restored the changelog, the position might tell IQ that the 
state store is more up-to-date than it actually is.
In contrast, if Kafka Streams handles persisting positions the same 
as persisting offset, the position should always be consistent with 
the offset, because they are persisted together.



102:
I am confused about your confusion which tells me that we are talking 
about two different things.

You asked

"Do you intent to add this information [i.e. position] to the map 
passed via commit(final Map changelogOffsets)?"


and with what I wrote I meant that we do not need to pass the 
position into the implementation of the StateStore interface since 
the position is updated within the implementation of the StateStore 
interface (e.g. RocksDBStore [1]). My statement describes the 
behavior now, not the change proposed in this KIP, so it does not 
contradict what is stated in the KIP.



200:
Th

Re: Failed to initialize processor KSTREAM-AGGREGATE-0000000001

2024-05-03 Thread Matthias J. Sax

Can you file a ticket for it: https://issues.apache.org/jira/browse/KAFKA



On 5/3/24 3:34 AM, Penumarthi Durga Prasad Chowdary wrote:

Kafka versions 3.5.1 and 3.7.0, we're still encountering persistent issues.
The Kafka Streams library is aligned with these Kafka versions. Upon
analysis of the logs, it seems that the problem may occur when a Kafka node
disconnects from Kafka Streams processes. This suspicion is supported by
the abundance of network messages indicating disconnections, such as


  org.apache.kafka.clients.NetworkClient
ThreadName: 
kafka-streams-exec-0-test-store-6d676cf0-3910-4c25-bfad-ea2b98953db3-StreamThread-9
   Message: [Consumer
clientId=kafka-streams-exec-0-test-store-6d676cf0-3910-4c25-bfad-ea2b98953db3-StreamThread-9-consumer,
groupId=kafka-streams-exec-0-test-store ] Node 102 disconnected.




On Mon, Apr 22, 2024 at 7:16 AM Matthias J. Sax  wrote:


Not sure either, but it sounds like a bug to me. Can you reproduce this
reliably? What version are you using?

It would be best if you could file a Jira ticket and we can take it from
there.


-Matthias

On 4/21/24 5:38 PM, Penumarthi Durga Prasad Chowdary wrote:

Hi ,
I have an issue in kafka-streams while constructing kafka-streams state
store  windows(TimeWindow and SessionWindow). While kafka-streams
processing data sometimes intermittent kafka-streams process throwing

below

error
ThreadName:


kafka-streams-exec-0-test-store-6d676cf0-3910-4c25-bfad-ea2b98953db3-StreamThread-9

TraceID: unknown  CorelationID: eff36722-1430-4ffb-bf2e-c6e6cf6ae164
   Message: stream-client [ kafka-streams-exec-0-test-store
-6d676cf0-3910-4c25-bfad-ea2b98953db3] Replacing thread in the streams
uncaught exception handler
org.apache.kafka.streams.errors.StreamsException: failed to initialize
processor KSTREAM-AGGREGATE-01
at


org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:115)

at


org.apache.kafka.streams.processor.internals.StreamTask.initializeTopology(StreamTask.java:986)

at


org.apache.kafka.streams.processor.internals.StreamTask.completeRestoration(StreamTask.java:271)

at


org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:716)

at


org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:901)

at


org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:778)

at


org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)

at


org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)

   Caused by: java.lang.NullPointerException
at


org.apache.kafka.streams.kstream.internals.TimestampedTupleForwarder.(TimestampedTupleForwarder.java:46)

at


org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.init(KStreamSessionWindowAggregate.java:138)

at


org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:107)

... 7 more
Here my understanding is state-store is null and at that time
stateStore.flush() gets invoked to send the data to stateStore, this

leads

to the above error. This error can be caught inside kafka-streams
setUncaughtExceptionHandler.
streams.setUncaughtExceptionHandler(throwable -> {
LOGGER.error("Exception in streams", throwable);
return


StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;

});
I'm uncertain about the exact reason for this issue. Everything seems to

be

in order, including the Kafka cluster, and there are no errors in the

Kafka

Streams except for a few logs indicating node disconnections.
Is there a better way to handle this error?
When can this issue happen ?
I would like to express my gratitude in advance for any assistance

provided.






Re: [DISCUSS] KIP-1027 Add MockFixedKeyProcessorContext

2024-05-03 Thread Matthias J. Sax

Please also update the KIP.

To get a wiki account created, please request it via a commet on this 
ticket: https://issues.apache.org/jira/browse/INFRA-25451


After you have the account, please share your wiki id, and we can give 
you write permission on the wiki.




-Matthias

On 5/3/24 6:30 AM, Shashwat Pandey wrote:

Hi Matthias,

Sorry this fell out of my radar for a bit.

Revisiting the topic, I think you’re right and we accept the duplicated
nesting as an appropriate solution to not affect the larger public API.

I can update my PR with the change.

Regards,
Shashwat Pandey


On Wed, May 1, 2024 at 11:00 PM Matthias J. Sax  wrote:


Any updates on this KIP?

On 3/28/24 4:11 AM, Matthias J. Sax wrote:

It seems that `MockRecordMetadata` is a private class, and thus not part
of the public API. If there are any changes required, we don't need to
discuss on the KIP.


For `CapturedPunctuator` and `CapturedForward` it's a little bit more
tricky. My gut feeling is, that the classes might not need to be
changed, but if we use them within `MockProcessorContext` and
`MockFixedKeyProcessorContext` it might be weird to keep the current
nesting... The problem I see is, that it's not straightforward how to
move the classes w/o breaking compatibility, nor if we duplicate them as
standalone classes w/o a larger "splash radius". (We would need to add
new overloads for MockProcessorContext#scheduledPunctuators() and
MockProcessorContext#forwarded()).

Might be good to hear from others if we think it's worth this larger
changes to get rid of the nesting, or just accept the somewhat not ideal
nesting as it technically is not a real issue?


-Matthias


On 3/15/24 1:47 AM, Shashwat Pandey wrote:

Thanks for the feedback Matthias!

The reason I proposed the extension of MockProcessorContext was more
to do
with the internals of the class (MockRecordMetadata,
CapturedPunctuator and
CapturedForward).

However, I do see your point, I would then think to split
MockProcessorContext and MockFixedKeyProcessorContext, some of the
internal
classes should also be extracted i.e. MockRecordMetadata,
CapturedPunctuator and probably a new CapturedFixedKeyForward.

Let me know what you think!


Regards,
Shashwat Pandey


On Mon, Mar 11, 2024 at 10:09 PM Matthias J. Sax 
wrote:


Thanks for the KIP Shashwat. Closing this testing gap is great! It did
come up a few time already...

One question: why do you propose to `extend MockProcessorContext`?

Given how the actual runtime context classes are setup, it seems that
the regular context and fixed-key-context are distinct, and thus I
believe both mock-context classes should be distinct, too?

What I mean is that FixedKeyProcessorContext does not extend
ProcessorContext. Both classes have a common parent ProcessINGContext
(note the very similar but different names), but they are "siblings"
only, so why make the mock processor a parent-child relationship?

It seems better to do

public class MockFixedKeyProcessorContext
 implements FixedKeyProcessorContext,
RecordCollector.Supplier


Of course, if there is code we can share between both mock-context we
should so this, but it should not leak into the public API?


-Matthias



On 3/11/24 5:21 PM, Shashwat Pandey wrote:

Hi everyone,

I would like to start the discussion on




https://cwiki.apache.org/confluence/display/KAFKA/KIP-1027%3A+Add+MockFixedKeyProcessorContext


This adds MockFixedKeyProcessorContext to the Kafka Streams Test Utils
library.

Regards,
Shashwat Pandey











Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-05-03 Thread Matthias J. Sax
117f: Good point by Bruno. We should check for this, and could have an 
additional `INVALID_STANDBY_TASK` error code?



-Matthias

On 5/3/24 5:52 AM, Guozhang Wang wrote:

Hi Sophie,

Re: As for the return type of the TaskAssignmentUtils, I think that
makes sense. LGTM.

On Fri, May 3, 2024 at 2:26 AM Bruno Cadonna  wrote:


Hi Sophie,

117f:
I think, removing the STATEFUL and STATELESS types is not enough to
avoid the error Guozhang mentioned. The StreamsPartitionAssignor passes
the information whether a task is stateless or stateful into the task
assignor. However, the task assignor can return a standby task for a
stateless task which is inconsistent.

Echoing Matthias' statement about the missing UNKNOWN_TASK_ID error.

nit:
The titles of some code blocks in the KIP are not consistent with their
content, e.g., KafkaStreamsState <-> NodeState


Best,
Bruno

On 5/3/24 2:43 AM, Matthias J. Sax wrote:

Thanks Sophie. My bad. You are of course right about `TaskAssignment`
and the StreamsPartitionAssignor's responsibitliy to map tasks of a
instance to consumers. When I wrote my reply, I forgot about this detail.

Seems you did not add `UNKNOWN_TASK_ID` error yet as proposed by Guozhang?

Otherwise LGTM.


-Matthias

On 5/2/24 4:20 PM, Sophie Blee-Goldman wrote:

Guozhang:

117. All three additions make sense to me. However, while thinking about
how users would actually produce an assignment, I realized that it seems
silly to make it their responsibility to distinguish between a stateless
and stateful task when they return the assignment. The
StreamsPartitionAssignor already knows which tasks are stateful vs
stateless, so there's no need to add this extra step for users to
figure it
out themselves, and potentially make a mistake.

117f: So, rather than add a new error type for "inconsistent task types",
I'm proposing to just flatten the AssignedTask.Type enum to only "ACTIVE"
and "STANDBY", and remove the "STATEFUL" and "STATELESS" types
altogether.
Any objections?

-

-Thanks, fixed the indentation of headers under "User APIs" and
"Read-Only
APIs"

-As for the return type of the TaskAssignmentUtils methods, I don't
personally feel too strongly about this, but the reason for the return
type
being a Map rather than a
TaskAssignment
is because they are meant to be used iteratively/to create a part of the
full assignment, and not necessarily a full assignment for each. Notice
that they all have an input parameter of the same type: Map. The idea is you can take the output of any of
these and pass it in to another to generate or optimize another piece of
the overall assignment. For example, if you want to perform the
rack-aware
optimization on both active and standby tasks, you would need to call
#optimizeRackAwareActiveTasks and then forward the output to
#optimizeRackAwareStandbyTasks to get the final assignment. If we
return a
TaskAssignment, it will usually need to be unwrapped right away. Perhaps
more importantly, I worry that returning a TaskAssignment will make it
seem
like each of these utility methods return a "full" and final assignment
that can just be returned as-is from the TaskAssignor's #assign method.
Whereas they are each just a single step in the full assignment process,
and not the final product. Does that make sense?

On Thu, May 2, 2024 at 3:50 PM Sophie Blee-Goldman

wrote:


Matthias:

Thanks for the naming suggestions for the error codes. I was
definitely not happy with my original naming but couldn't think of
anything
better.  I like your proposals though, will update the KIP names.
I'll also
add a "NONE" option as well -- much better than just passing in null
for no
error.


OVERLAPPING_CLIENT : multiple KafkaStreams clients assigned with the
same active task


   Would also be an error if assigned to two consumers of the same
client...

Needs to be rephrased.



Well the TaskAssignor only assigns tasks to KafkaStreams clients,
it's not
responsible for the assignment of tasks to consumers within a
KafkaStreams.
It would be a bug in the StreamsPartitionAssignor if it received a valid
assignment from the TaskAssignor with only one copy of a task
assigned to a
single KAfkaStreams client, and then somehow ended up assigning that
task
to multiple consumers on the KafkaStreams client. It wouldn't be the
TaskAssignor's fault so imo it would not make sense to include this
case in
the OVERLAPPING_CLIENT error (or as it's now called, ACTIVE_TASK_
ASSIGNED_MULTIPLE_TIMES).  Not to mention, if there was a bug that
caused
the StreamsPartitionAssignor to assign a task to multiple consumers, it
presumably wouldn't even notice since it's a bug -- if it did notice, it
can just fix the issue. The error codes are about communicating
unfixable
issues due to the TaskAssignor itself returning an invalid
assignment. The
phrasing is intentional, and (imo) correct as it is.

I do see your point about how th

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-05-03 Thread Matthias J. Sax
101: Yes, but what I am saying is, that we don't need to flush the 
.position file to disk periodically, but only maintain it in main 
memory, and only write it to disk on close() to preserve it across 
restarts. This way, it would never be ahead, but might only lag? But 
with my better understanding about (102) it might be mood anyway...



102: Thanks for clarifying. Looked into the code now. Makes sense. Might 
be something to be worth calling out explicitly in the KIP writeup. -- 
Now that I realize that the position is tracked inside the store (not 
outside as the changelog offsets) it makes much more sense to pull 
position into RocksDB itself. In the end, it's actually a "store 
implementation" detail how it tracks the position (and kinda leaky 
abstraction currently, that we re-use the checkpoint file mechanism to 
track it and flush to disk).



200: I was thinking about this a little bit more, and maybe it's not too 
bad? When KS starts up, we could upon all stores we find on local disk 
pro-actively, and keep them all open until the first rebalance finishes: 
For tasks we get assigned, we hand in the already opened store (this 
would amortize the cost to open the store before the rebalance) and for 
non-assigned tasks, we know the offset information won't change and we 
could just cache it in-memory for later reuse (ie, next rebalance) and 
close the store to free up resources? -- Assuming that we would get a 
large percentage of opened stores assigned as tasks anyway, this could work?



-Matthias

On 5/3/24 1:29 AM, Bruno Cadonna wrote:

Hi Matthias,


101:
Let's assume a RocksDB store, but I think the following might be true 
also for other store implementations. With this KIP, if Kafka Streams 
commits the offsets, the committed offsets will be stored in an 
in-memory data structure (i.e. the memtable) and stay there until 
RocksDB decides that it is time to persist its in-memory data structure. 
If Kafka Streams writes its position to the .position file during a 
commit and a crash happens before RocksDB persist the memtable then the 
position in the .position file is ahead of the persisted offset. If IQ 
is done between the crash and the state store fully restored the 
changelog, the position might tell IQ that the state store is more 
up-to-date than it actually is.
In contrast, if Kafka Streams handles persisting positions the same as 
persisting offset, the position should always be consistent with the 
offset, because they are persisted together.



102:
I am confused about your confusion which tells me that we are talking 
about two different things.

You asked

"Do you intent to add this information [i.e. position] to the map passed 
via commit(final Map changelogOffsets)?"


and with what I wrote I meant that we do not need to pass the position 
into the implementation of the StateStore interface since the position 
is updated within the implementation of the StateStore interface (e.g. 
RocksDBStore [1]). My statement describes the behavior now, not the 
change proposed in this KIP, so it does not contradict what is stated in 
the KIP.



200:
This is about Matthias' main concern about rebalance metadata.
As far as I understand the KIP, Kafka Streams will only use the 
.checkpoint files to compute the task lag for unassigned tasks whose 
state is locally available. For assigned tasks, it will use the offsets 
managed by the open state store.


Best,
Bruno

[1] 
https://github.com/apache/kafka/blob/fcbfd3412eb746a0c81374eb55ad0f73de6b1e71/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L397


On 5/1/24 3:00 AM, Matthias J. Sax wrote:

Thanks Bruno.



101: I think I understand this better now. But just want to make sure 
I do. What do you mean by "they can diverge" and "Recovering after a 
failure might load inconsistent offsets and positions."


The checkpoint is the offset from the changelog, while the position is 
the offset from the upstream source topic, right? -- In the end, the 
position is about IQ, and if we fail to update it, it only means that 
there is some gap when we might not be able to query a standby task, 
because we think it's not up-to-date enough even if it is, which would 
resolve itself soon? Ie, the position might "lag", but it's not 
"inconsistent". Do we believe that this lag would be highly problematic?




102: I am confused.

The position is maintained inside the state store, but is persisted 
in the .position file when the state store closes. 


This contradicts the KIP:

 these position offsets will be stored in RocksDB, in the same column 
family as the changelog offsets, instead of the .position file




My main concern is currently about rebalance metadata -- opening 
RocksDB stores seems to be very expensive, but if we follow the KIP:


We will do this under EOS by updating the .checkpoint file whenever a 
store is close()d. 


It seems, having the offset 

Re: [VOTE] KIP-1036: Extend RecordDeserializationException exception

2024-05-03 Thread Matthias J. Sax

+1 (binding)

On 5/3/24 8:52 AM, Federico Valeri wrote:

Hi Fred, this is a useful addition.

+1 non binding

Thanks

On Fri, May 3, 2024 at 4:11 PM Andrew Schofield
 wrote:


Hi Fred,
Thanks for the KIP. It’s turned out nice and elegant I think. Definitely a 
worthwhile improvement.

+1 (non-binding)

Thanks,
Andrew


On 30 Apr 2024, at 14:02, Frédérik Rouleau  
wrote:

Hi all,

As there is no more activity for a while on the discuss thread, I think we
can start a vote.
The KIP is available on
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1036%3A+Extend+RecordDeserializationException+exception


If you have some feedback or suggestions, please participate to the
discussion thread:
https://lists.apache.org/thread/or85okygtfywvnsfd37kwykkq5jq7fy5

Best regards,
Fred




Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-05-03 Thread Matthias J. Sax
What about (106) to unify both exiting callback methods of 
`ProductionExceptionHandler` into a single one, instead of adding two 
new ones?


Damien's last reply about it was:


I will think about unifying, I do agree it would be cleaner.


There was not follow up on this question, and the KIP right now still 
proposes to add two new methods, which I believe we could (should?) 
unify to:


default ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, 
  final ProducerRecord record,

  final Exception exception) {


Ie, we drop the generics `` on `ProducerRecord` what 
allows you to also pass in a non-serialized ProducerRecord of any type 
for the serialization error case.


Btw: wondering if we also want to pass in a flag/enum about key vs value 
serialization error similar to what was proposed in KIP-1036? The only 
"oddity" would be, that we call the handler other error cases, too, not 
just for serialization exceptions. But we wculd tackle this by 
introducing a new class `RecordSerializationException` which would 
include the flag and would ensure that KS hands this exception into the 
handler. This would keep the handler interface/method itself clean.



-Matthias




On 5/3/24 2:15 AM, Loic Greffier wrote:

Hi Bruno,

Good catch, KIP has been updated accordingly.

Regards,
Loïc


Re: [VOTE] KIP-924: customizable task assignment for Streams

2024-05-02 Thread Matthias J. Sax

I left one more nit on the discuss thread. But overall LGTM.

+1 (binding)

Thanks Rohan and Sophie for driving this KIP.


-Matthias

On 4/29/24 2:07 PM, Sophie Blee-Goldman wrote:

+1 (binding)

thanks for driving this KIP!

On Tue, Apr 16, 2024 at 1:46 PM Rohan Desai  wrote:



https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams

As this KIP has been open for a while, and gone through a couple rounds of
review/revision, I'm calling a vote to get it approved.





Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-05-02 Thread Matthias J. Sax
StreamsPartitionAssignor will

immediately "fail" the rebalance and retry it by scheduling an immediate
followup rebalance.

I'm also a bit concerned here, as such endless retry loops have
happened in the past in my memory. Given that we would likely see most
of the user implementations be deterministic, I'm also leaning towards
failing the app immediately and let the crowd educates us if there are
some very interesting scenarios out there that are not on our radar to
re-consider this, rather than getting hard to debug cases in the dark.

-

And here are just some nits about the KIP writings itself:

* I think some bullet points under `User APIs` and `Read-only APIs`
should have a lower level indention? It caught me for a sec until I
realized there are just two categories.

* In TaskAssignmentUtils , why not let those util functions return
`TaskAssignment` (to me it feels more consistent with the user APIs),
but instead return a Map?


Guozhang

On Tue, Apr 30, 2024 at 5:28 PM Matthias J. Sax  wrote:


I like the idea of error codes. Not sure if the name are ideal?
UNKNOWN_PROCESS_ID makes sense, but the other two seems a little bit
difficult to understand?

Should we be very descriptive (and also try to avoid coupling it to the
threading model -- important for the first error code):
   - ACTIVE_TASK_ ASSIGNED_MULTIPLE_TIMES
   - ACTIVE_AND_STANDBY_ASSIGNED_TO_SAME_CLIENT (or _INSTANCE

I think we also need to add NONE as option or make the error parameter
an `Optional`?



OVERLAPPING_CLIENT : multiple KafkaStreams clients assigned with the

same active task


Would also be an error if assigned to two consumers of the same
client... Needs to be rephrased.




If any of these errors are detected, the StreamsPartitionAssignor

will immediately "fail" the rebalance and retry it by scheduling an
immediate followup rebalance.


Does this make sense? If we assume that the task-assignment is
deterministic, we would end up with an infinite retry loop? Also,
assuming that an client leave the group, we cannot assign some task any
longer... I would rather throw a StreamsException and let the client

crash.




-Matthias

On 4/30/24 12:22 PM, Sophie Blee-Goldman wrote:

One last thing: I added an error code enum to be returned from the
#onAssignmentComputed method in case of an invalid assignment. I

created

one code for each of the invalid cases we described above. The

downside is

that this means we'll have to go through a deprecation cycle if we

want to

loosen up the restrictions on any of the enforced cases. The upside

is that

we can very clearly mark what is an invalid assignment and this will
(hopefully) assist users who are new to customizing assignments by

clearly

denoting the requirements, and returning a clear error if they are not
followed.

Of course the StreamsPartitionAssignor will also do a "fallback &

retry" in

this case by returning the same assignment to the consumers and

scheduling

a followup rebalance. I've added all of this to the TaskAssignor  and
#onAssignmentComputed javadocs, and added a section under "Public

Changes"

as well.

Please let me know if there are any concerns, or if you have

suggestions

for how else we can handle an invalid assignment

On Tue, Apr 30, 2024 at 11:39 AM Sophie Blee-Goldman <

sop...@responsive.dev>

wrote:


Thanks guys! I agree with what Lucas said about 117c, we can always

loosen

a restriction later and I don't want to do anything now that might

get in

the way of the new threading models.

With that I think we're all in agreement on 117. I'll update the KIP

to

include what we've discussed

(and will fix the remaining #finalAssignment mention as well, thanks
Bruno. Glad to have such good proof readers! :P)

On Tue, Apr 30, 2024 at 8:35 AM Bruno Cadonna 

wrote:



Hi again,

I forgot to ask whether you could add the agreement about handling
invalid assignment to the KIP.

Best,
Bruno

On 4/30/24 2:00 PM, Bruno Cadonna wrote:

Hi all,

I think we are converging!

117
a) fail: Since it is an invalid consumer assignment
b) pass: I agree that not assigning a task might be reasonable in

some

situations
c) fail: For the reasons Lucas pointed out. I am missing a good use

case

here.
d) fail: It is invalid


Somewhere in the KIP you still use finalAssignment() instead of the
wonderful method name onAssignmentComputed() ;-)
"... interface also includes a method named finalAssignment which

is

called with the final computed GroupAssignment ..."


Best,
Bruno


On 4/30/24 1:04 PM, Lucas Brutschy wrote:

Hi,

Looks like a great KIP to me!

I'm late, so I'm only going to comment on the last open point

117. I'm

against any fallbacks like "use the default assignor if the custom
assignment is invalid", as it's just going to hide bugs. For the 4
cases mentioned by Sophie:

117a) I'd fail immediately here, as it's an implementation bug,

and

should not lead to a valid consumer group as

[jira] [Updated] (KAFKA-16260) Deprecate window.size.ms and window.inner.serde.class in StreamsConfig

2024-05-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16260:

Component/s: clients

> Deprecate window.size.ms and window.inner.serde.class in StreamsConfig
> --
>
> Key: KAFKA-16260
> URL: https://issues.apache.org/jira/browse/KAFKA-16260
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, streams
>Reporter: Lucia Cerchie
>Assignee: Lucia Cerchie
>Priority: Major
>  Labels: KIP
>
> {{window.size.ms}} and {{window.inner.serde.class}} are not a true 
> KafkaStreams config, and are ignored when set from a KStreams application. 
> Both belong on the client.
> KIP-1020: 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16260) Deprecate window.size.ms and window.inner.serde.class in StreamsConfig

2024-05-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16260:

Description: 
{{window.size.ms}} and {{window.inner.serde.class}} are not a true KafkaStreams 
config, and are ignored when set from a KStreams application. Both belong on 
the client.

KIP-1020: 
[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]

 

  was:
{{indwindow.size.ms}} and `is not a true KafkaStreams config, and results in an 
error when set from a KStreams application. It belongs on the client.

KIP-1020: 
[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]

 


> Deprecate window.size.ms and window.inner.serde.class in StreamsConfig
> --
>
> Key: KAFKA-16260
> URL: https://issues.apache.org/jira/browse/KAFKA-16260
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Lucia Cerchie
>Assignee: Lucia Cerchie
>Priority: Major
>  Labels: KIP
>
> {{window.size.ms}} and {{window.inner.serde.class}} are not a true 
> KafkaStreams config, and are ignored when set from a KStreams application. 
> Both belong on the client.
> KIP-1020: 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16260) Deprecate window.size.ms and window.inner.serde.class in StreamsConfig

2024-05-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16260:

Description: 
{{indwindow.size.ms}} and `is not a true KafkaStreams config, and results in an 
error when set from a KStreams application. It belongs on the client.

KIP-1020: 
[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]

 

  was:
{{window.size.ms}} and `is not a true KafkaStreams config, and results in an 
error when set from a KStreams application. It belongs on the client.

KIP-1020: 
[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]

 


> Deprecate window.size.ms and window.inner.serde.class in StreamsConfig
> --
>
> Key: KAFKA-16260
> URL: https://issues.apache.org/jira/browse/KAFKA-16260
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Lucia Cerchie
>Assignee: Lucia Cerchie
>Priority: Major
>  Labels: KIP
>
> {{indwindow.size.ms}} and `is not a true KafkaStreams config, and results in 
> an error when set from a KStreams application. It belongs on the client.
> KIP-1020: 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16260) Deprecate window.size.ms and inner.serde.class in StreamsConfig

2024-05-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16260:

Description: 
{{window.size.ms}} and `is not a true KafkaStreams config, and results in an 
error when set from a KStreams application. It belongs on the client.

KIP-1020: 
[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]

 

  was:
{{window.size.ms}}  is not a true KafkaStreams config, and results in an error 
when set from a KStreams application. It belongs on the client.

[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]

 


> Deprecate window.size.ms and inner.serde.class in StreamsConfig
> ---
>
> Key: KAFKA-16260
> URL: https://issues.apache.org/jira/browse/KAFKA-16260
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Lucia Cerchie
>Assignee: Lucia Cerchie
>Priority: Major
>  Labels: KIP
>
> {{window.size.ms}} and `is not a true KafkaStreams config, and results in an 
> error when set from a KStreams application. It belongs on the client.
> KIP-1020: 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16260) Deprecate window.size.ms and window.inner.serde.class in StreamsConfig

2024-05-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16260:

Summary: Deprecate window.size.ms and window.inner.serde.class in 
StreamsConfig  (was: Deprecate window.size.ms and inner.serde.class in 
StreamsConfig)

> Deprecate window.size.ms and window.inner.serde.class in StreamsConfig
> --
>
> Key: KAFKA-16260
> URL: https://issues.apache.org/jira/browse/KAFKA-16260
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Lucia Cerchie
>Assignee: Lucia Cerchie
>Priority: Major
>  Labels: KIP
>
> {{window.size.ms}} and `is not a true KafkaStreams config, and results in an 
> error when set from a KStreams application. It belongs on the client.
> KIP-1020: 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16260) Deprecate window.size.ms and inner.serde.class in StreamsConfig

2024-05-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16260:

Summary: Deprecate window.size.ms and inner.serde.class in StreamsConfig  
(was: Deprecate window.size.ms in StreamsConfig)

> Deprecate window.size.ms and inner.serde.class in StreamsConfig
> ---
>
> Key: KAFKA-16260
> URL: https://issues.apache.org/jira/browse/KAFKA-16260
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Lucia Cerchie
>Assignee: Lucia Cerchie
>Priority: Major
>  Labels: KIP
>
> {{window.size.ms}}  is not a true KafkaStreams config, and results in an 
> error when set from a KStreams application. It belongs on the client.
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16260) Deprecate window.size.ms in StreamsConfig

2024-05-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16260:

Description: 
{{window.size.ms}}  is not a true KafkaStreams config, and results in an error 
when set from a KStreams application. It belongs on the client.

[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]

 

  was:{{window.size.ms}}  is not a true KafkaStreams config, and results in an 
error when set from a KStreams application. It belongs on the client.


> Deprecate window.size.ms in StreamsConfig
> -
>
> Key: KAFKA-16260
> URL: https://issues.apache.org/jira/browse/KAFKA-16260
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Lucia Cerchie
>Assignee: Lucia Cerchie
>Priority: Major
>  Labels: KIP
>
> {{window.size.ms}}  is not a true KafkaStreams config, and results in an 
> error when set from a KStreams application. It belongs on the client.
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16260) Deprecate window.size.ms in StreamsConfig

2024-05-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16260:

Labels: KIP  (was: needs-kip)

> Deprecate window.size.ms in StreamsConfig
> -
>
> Key: KAFKA-16260
> URL: https://issues.apache.org/jira/browse/KAFKA-16260
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Lucia Cerchie
>Assignee: Lucia Cerchie
>Priority: Major
>  Labels: KIP
>
> {{window.size.ms}}  is not a true KafkaStreams config, and results in an 
> error when set from a KStreams application. It belongs on the client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1027 Add MockFixedKeyProcessorContext

2024-05-01 Thread Matthias J. Sax

Any updates on this KIP?

On 3/28/24 4:11 AM, Matthias J. Sax wrote:
It seems that `MockRecordMetadata` is a private class, and thus not part 
of the public API. If there are any changes required, we don't need to 
discuss on the KIP.



For `CapturedPunctuator` and `CapturedForward` it's a little bit more 
tricky. My gut feeling is, that the classes might not need to be 
changed, but if we use them within `MockProcessorContext` and 
`MockFixedKeyProcessorContext` it might be weird to keep the current 
nesting... The problem I see is, that it's not straightforward how to 
move the classes w/o breaking compatibility, nor if we duplicate them as 
standalone classes w/o a larger "splash radius". (We would need to add 
new overloads for MockProcessorContext#scheduledPunctuators() and 
MockProcessorContext#forwarded()).


Might be good to hear from others if we think it's worth this larger 
changes to get rid of the nesting, or just accept the somewhat not ideal 
nesting as it technically is not a real issue?



-Matthias


On 3/15/24 1:47 AM, Shashwat Pandey wrote:

Thanks for the feedback Matthias!

The reason I proposed the extension of MockProcessorContext was more 
to do
with the internals of the class (MockRecordMetadata, 
CapturedPunctuator and

CapturedForward).

However, I do see your point, I would then think to split
MockProcessorContext and MockFixedKeyProcessorContext, some of the 
internal

classes should also be extracted i.e. MockRecordMetadata,
CapturedPunctuator and probably a new CapturedFixedKeyForward.

Let me know what you think!


Regards,
Shashwat Pandey


On Mon, Mar 11, 2024 at 10:09 PM Matthias J. Sax  
wrote:



Thanks for the KIP Shashwat. Closing this testing gap is great! It did
come up a few time already...

One question: why do you propose to `extend MockProcessorContext`?

Given how the actual runtime context classes are setup, it seems that
the regular context and fixed-key-context are distinct, and thus I
believe both mock-context classes should be distinct, too?

What I mean is that FixedKeyProcessorContext does not extend
ProcessorContext. Both classes have a common parent ProcessINGContext
(note the very similar but different names), but they are "siblings"
only, so why make the mock processor a parent-child relationship?

It seems better to do

public class MockFixedKeyProcessorContext
    implements FixedKeyProcessorContext,
   RecordCollector.Supplier


Of course, if there is code we can share between both mock-context we
should so this, but it should not leak into the public API?


-Matthias



On 3/11/24 5:21 PM, Shashwat Pandey wrote:

Hi everyone,

I would like to start the discussion on


https://cwiki.apache.org/confluence/display/KAFKA/KIP-1027%3A+Add+MockFixedKeyProcessorContext


This adds MockFixedKeyProcessorContext to the Kafka Streams Test Utils
library.

Regards,
Shashwat Pandey







Re: outerjoin not joining after window

2024-05-01 Thread Matthias J. Sax

How do you know this?

First thing we do is write a log message in the value joiner. We don't see
the log message for the missed records.


Well, for left/right join results, the ValueJoiner would only be called 
when the window is closed... And for invalid input (or late record, ie, 
which arrive out-of-order and their window was already closes), records 
would be dropped right away. So you cannot really infer that a record 
did make it into the join or not, or what happens if it did make it into 
the `Processor`.


-> https://kafka.apache.org/documentation/#kafka_streams_task_monitoring

`dropped-records-total` is the name of the metric.



-Matthias



On 5/1/24 11:35 AM, Chad Preisler wrote:

Hello,

We did some testing in our test environment today. We are seeing some
records processes where only one side of the join has a record. So that's
good. However, we are still seeing some records get skipped. They never hit
the value joiner (we write a log message first thing in the value joiner).
During the test we were putting some load on the system, so stream time was
advancing. We did notice that the join windows were taking much longer than
30 minutes to close and process records. Thirty minutes is the window plus
grace.


How do you know this?

First thing we do is write a log message in the value joiner. We don't see
the log message for the missed records.

I will try pushing the same records locally. However, we don't see any
errors in our logs and the stream does process one sided joins after the
skipped record. Do you have any docs on the "dropper records" metric? I did
a Google search and didn't find many good results for that.

Thanks,

Chad

On Tue, Apr 30, 2024 at 2:49 PM Matthias J. Sax  wrote:


Thanks for the information. I ran the code using Kafka locally. After
submitting some records inside and outside of the time window and grace,
the join performed as expected when running locally.


That gives some hope :)




However, they never get into the join.


How do you know this?


Did you check the metric for dropper records? Maybe records are
considers malformed and dropped? Are you using the same records in
production and in your local test?



Are there any settings for the stream client that would affect the join?


Not that I can think of... There is one more internal config, but as
long as data is flowing, it should not impact the result you see.



Are there any settings on the broker side that would affect the join?


No. The join is computed client side. Broker configs should not have any
impact.


f I increase the log level for the streams API would that

shed some light on what is happening?


I don't think it would help much. The code in question is
org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin -- but it
does not do any logging except WARN for the already mentioned "dropping
malformed" records that is also recorded via JMX.


WARN: "Skipping record due to null key or value. "



If you can identify a specific record from the input which would produce
an output, but does not, maybe you can try to feed it into your local
test env and try to re-produce the issue?


-Matthias

On 4/30/24 11:38 AM, Chad Preisler wrote:

Matthias,

Thanks for the information. I ran the code using Kafka locally. After
submitting some records inside and outside of the time window and grace,
the join performed as expected when running locally.

I'm not sure why the join is not working as expected when running against
our actual brokers. We are peeking at the records for the streams and we
are seeing the records get pulled. However, they never get into the join.
It's been over 24 hours since the expected records were created, and

there

has been plenty of traffic to advance the stream time. Only records that
have both a left and right side match are getting processed by the join.

Are there any settings for the stream client that would affect the join?

Are there any settings on the broker side that would affect the join?

The outer join is just one part of the topology. Compared to running it
locally there is a lot more data going through the app when running on

our

actual servers. If I increase the log level for the streams API would

that

shed some light on what is happening? Does anyone know if there are
specific packages that I should increase the log level for? Any specific
log message I can hone in on to tell me what is going on?

Basically, I'm looking for some pointers on where I can start looking.

Thanks,
Chad


On Tue, Apr 30, 2024 at 10:26 AM Matthias J. Sax 

wrote:



I expect the join to

execute after the 25 with one side of the join containing a record and

the

other being null


Given that you also have a grace period of 5 minutes, the result will
only be emitted after the grace-period passed and the window is closed
(not when window end time is reached).


One has a

naming convention of "KSTREAM_OUTERSHARED". I see a rec

[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-05-01 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842805#comment-17842805
 ] 

Matthias J. Sax commented on KAFKA-16514:
-

Thanks for the background! Makes sense.

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16644) FK join emits duplicate tombstone on left-side delete

2024-04-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16644.
-
Resolution: Duplicate

> FK join emits duplicate tombstone on left-side delete
> -
>
> Key: KAFKA-16644
> URL: https://issues.apache.org/jira/browse/KAFKA-16644
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>        Reporter: Matthias J. Sax
>Priority: Major
>
> We introduced a regression bug in 3.7.0 release via KAFKA-14748. When a 
> left-hand side record is deleted, the join now emits two tombstone records 
> instead of one.
> The problem was not detected via unit test, because the tests use a `Map` 
> instead of a `List` when verifying the result topic records 
> ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).]
> We should update all test cases to use `List` when reading from the output 
> topic, and of course fix the introduced bug: The 
> `SubscriptionSendProcessorSupplier` is sending two subscription records 
> instead of just a single one: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16644) FK join emits duplicate tombstone on left-side delete

2024-04-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16644.
-
Resolution: Duplicate

> FK join emits duplicate tombstone on left-side delete
> -
>
> Key: KAFKA-16644
> URL: https://issues.apache.org/jira/browse/KAFKA-16644
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>        Reporter: Matthias J. Sax
>Priority: Major
>
> We introduced a regression bug in 3.7.0 release via KAFKA-14748. When a 
> left-hand side record is deleted, the join now emits two tombstone records 
> instead of one.
> The problem was not detected via unit test, because the tests use a `Map` 
> instead of a `List` when verifying the result topic records 
> ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).]
> We should update all test cases to use `List` when reading from the output 
> topic, and of course fix the introduced bug: The 
> `SubscriptionSendProcessorSupplier` is sending two subscription records 
> instead of just a single one: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-30 Thread Matthias J. Sax

Thanks for the update.

I am wondering if we should use `ReadOnlyHeaders` instead of 
`ImmutableHeaders` as interface name?


Also, the returned `Header` interface is technically not immutable 
either, because `Header#key()` returns a mutable byte-array... Would we 
need a `ReadOnlyHeader` interface?


If yes, it seems that `ReadOnlyHeaders` should not be a super-interface 
of `Headers` but it would rather be a standalone interface, and a 
wrapper for a `Headers` instance? And `ReadOnlyHeader` would return some 
immutable type instead of `byte[]` for the value()?


An alternative would be to deep-copy the value byte-array what would not 
be free, but given that we are talking about exception handling, it 
would not be on the hot code path, and thus might be acceptable?



The above seems to increase the complexity significantly though. Hence, 
I have seconds thoughts on the immutability question:


Do we really need to worry about mutability after all, because in the 
end, KS runtime won't read the Headers instance after the handler was 
called, and if a user modifies the passed in headers, there won't be any 
actual damage (ie, no side effects)? For this case, it might even be ok 
to also not add `ImmutableHeaders` to begin with?




Sorry for the forth and back (yes, forth and back, because back and 
forth does not make sense -- it's not logical -- just trying to fix 
English :D) as I did bring up the immutability question in the first 
place...




-Matthias

On 4/25/24 5:56 AM, Loic Greffier wrote:

Hi Matthias,

I have updated the KIP regarding points 103 and 108.

103.
I have suggested a new `ImmutableHeaders` interface to deal with the
immutability concern of the headers, which is basically the `Headers`
interface without the write accesses.

public interface ImmutableHeaders {
 Header lastHeader(String key);
 Iterable headers(String key);
 Header[] toArray();
}

The `Headers` interface can be updated accordingly:

public interface Headers extends ImmutableHeaders, Iterable {
 //…
}

Loïc


Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-30 Thread Matthias J. Sax

Thanks Bruno.



101: I think I understand this better now. But just want to make sure I 
do. What do you mean by "they can diverge" and "Recovering after a 
failure might load inconsistent offsets and positions."


The checkpoint is the offset from the changelog, while the position is 
the offset from the upstream source topic, right? -- In the end, the 
position is about IQ, and if we fail to update it, it only means that 
there is some gap when we might not be able to query a standby task, 
because we think it's not up-to-date enough even if it is, which would 
resolve itself soon? Ie, the position might "lag", but it's not 
"inconsistent". Do we believe that this lag would be highly problematic?




102: I am confused.

The position is maintained inside the state store, but is persisted in the .position file when the state store closes. 


This contradicts the KIP:


 these position offsets will be stored in RocksDB, in the same column family as 
the changelog offsets, instead of the .position file




My main concern is currently about rebalance metadata -- opening RocksDB 
stores seems to be very expensive, but if we follow the KIP:


We will do this under EOS by updating the .checkpoint file whenever a store is close()d. 


It seems, having the offset inside RocksDB does not help us at all? In 
the end, when we crash, we don't want to lose the state, but when we 
update the .checkpoint only on a clean close, the .checkpoint might be 
stale (ie, still contains the checkpoint when we opened the store when 
we got a task assigned).




-Matthias

On 4/30/24 2:40 AM, Bruno Cadonna wrote:

Hi all,

100
I think we already have such a wrapper. It is called 
AbstractReadWriteDecorator.



101
Currently, the position is checkpointed when a offset checkpoint is 
written. If we let the state store manage the committed offsets, we need 
to also let the state store also manage the position otherwise they 
might diverge. State store managed offsets can get flushed (i.e. 
checkpointed) to the disk when the state store decides to flush its 
in-memory data structures, but the position is only checkpointed at 
commit time. Recovering after a failure might load inconsistent offsets 
and positions.



102
The position is maintained inside the state store, but is persisted in 
the .position file when the state store closes. The only public 
interface that uses the position is IQv2 in a read-only mode. So the 
position is only updated within the state store and read from IQv2. No 
need to add anything to the public StateStore interface.



103
Deprecating managesOffsets() right away might be a good idea.


104
I agree that we should try to support downgrades without wipes. At least 
Nick should state in the KIP why we do not support it.



Best,
Bruno




On 4/23/24 8:13 AM, Matthias J. Sax wrote:
Thanks for splitting out this KIP. The discussion shows, that it is a 
complex beast by itself, so worth to discuss by its own.



Couple of question / comment:


100 `StateStore#commit()`: The JavaDoc says "must not be called by 
users" -- I would propose to put a guard in place for this, by either 
throwing an exception (preferable) or adding a no-op implementation 
(at least for our own stores, by wrapping them -- we cannot enforce it 
for custom stores I assume), and document this contract explicitly.



101 adding `.position` to the store: Why do we actually need this? The 
KIP says "To ensure consistency with the committed data and changelog 
offsets" but I am not sure if I can follow? Can you elaborate why 
leaving the `.position` file as-is won't work?



If it's possible at all, it will need to be done by
creating temporary StateManagers and StateStores during rebalance. I 
think

it is possible, and probably not too expensive, but the devil will be in
the detail.


This sounds like a significant overhead to me. We know that opening a 
single RocksDB takes about 500ms, and thus opening RocksDB to get this 
information might slow down rebalances significantly.



102: It's unclear to me, how `.position` information is added. The KIP 
only says: "position offsets will be stored in RocksDB, in the same 
column family as the changelog offsets". Do you intent to add this 
information to the map passed via `commit(final MapLong> changelogOffsets)`? The KIP should describe this in more detail. 
Also, if my assumption is correct, we might want to rename the 
parameter and also have a better JavaDoc description?



103: Should we make it mandatory (long-term) that all stores 
(including custom stores) manage their offsets internally? Maintaining 
both options and thus both code paths puts a burden on everyone and 
make the code messy. I would strongly prefer if we could have mid-term 
path to get rid of supporting both.  -- For this case, we should 
deprecate the newly added `managesOffsets()` method right away, to 
point out that we intend to re

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-30 Thread Matthias J. Sax
I like the idea of error codes. Not sure if the name are ideal? 
UNKNOWN_PROCESS_ID makes sense, but the other two seems a little bit 
difficult to understand?


Should we be very descriptive (and also try to avoid coupling it to the 
threading model -- important for the first error code):

 - ACTIVE_TASK_ ASSIGNED_MULTIPLE_TIMES
 - ACTIVE_AND_STANDBY_ASSIGNED_TO_SAME_CLIENT (or _INSTANCE

I think we also need to add NONE as option or make the error parameter 
an `Optional`?




OVERLAPPING_CLIENT : multiple KafkaStreams clients assigned with the same 
active task


Would also be an error if assigned to two consumers of the same 
client... Needs to be rephrased.





If any of these errors are detected, the StreamsPartitionAssignor will immediately 
"fail" the rebalance and retry it by scheduling an immediate followup rebalance.


Does this make sense? If we assume that the task-assignment is 
deterministic, we would end up with an infinite retry loop? Also, 
assuming that an client leave the group, we cannot assign some task any 
longer... I would rather throw a StreamsException and let the client crash.




-Matthias

On 4/30/24 12:22 PM, Sophie Blee-Goldman wrote:

One last thing: I added an error code enum to be returned from the
#onAssignmentComputed method in case of an invalid assignment. I created
one code for each of the invalid cases we described above. The downside is
that this means we'll have to go through a deprecation cycle if we want to
loosen up the restrictions on any of the enforced cases. The upside is that
we can very clearly mark what is an invalid assignment and this will
(hopefully) assist users who are new to customizing assignments by clearly
denoting the requirements, and returning a clear error if they are not
followed.

Of course the StreamsPartitionAssignor will also do a "fallback & retry" in
this case by returning the same assignment to the consumers and scheduling
a followup rebalance. I've added all of this to the TaskAssignor  and
#onAssignmentComputed javadocs, and added a section under "Public Changes"
as well.

Please let me know if there are any concerns, or if you have suggestions
for how else we can handle an invalid assignment

On Tue, Apr 30, 2024 at 11:39 AM Sophie Blee-Goldman 
wrote:


Thanks guys! I agree with what Lucas said about 117c, we can always loosen
a restriction later and I don't want to do anything now that might get in
the way of the new threading models.

With that I think we're all in agreement on 117. I'll update the KIP to
include what we've discussed

(and will fix the remaining #finalAssignment mention as well, thanks
Bruno. Glad to have such good proof readers! :P)

On Tue, Apr 30, 2024 at 8:35 AM Bruno Cadonna  wrote:


Hi again,

I forgot to ask whether you could add the agreement about handling
invalid assignment to the KIP.

Best,
Bruno

On 4/30/24 2:00 PM, Bruno Cadonna wrote:

Hi all,

I think we are converging!

117
a) fail: Since it is an invalid consumer assignment
b) pass: I agree that not assigning a task might be reasonable in some
situations
c) fail: For the reasons Lucas pointed out. I am missing a good use

case

here.
d) fail: It is invalid


Somewhere in the KIP you still use finalAssignment() instead of the
wonderful method name onAssignmentComputed() ;-)
"... interface also includes a method named finalAssignment which is
called with the final computed GroupAssignment ..."


Best,
Bruno


On 4/30/24 1:04 PM, Lucas Brutschy wrote:

Hi,

Looks like a great KIP to me!

I'm late, so I'm only going to comment on the last open point 117. I'm
against any fallbacks like "use the default assignor if the custom
assignment is invalid", as it's just going to hide bugs. For the 4
cases mentioned by Sophie:

117a) I'd fail immediately here, as it's an implementation bug, and
should not lead to a valid consumer group assignment.
117b) Agreed. This is a useful assignment and should be allowed.
117c) This is the tricky case. However, I'm leaning towards not
allowing this, unless we have a concrete use case. This will block us
from potentially using a single consumer for active and standby tasks
in the future. It's easier to drop the restriction later if we have a
concrete use case.
117d) Definitely fail immediately, as you said.

Cheers,
Lucas



On Mon, Apr 29, 2024 at 11:13 PM Sophie Blee-Goldman
 wrote:


Yeah I think that sums it up well. Either you computed a *possible*
assignment,
or you returned something that makes it literally impossible for the
StreamsPartitionAssignor to decipher/translate into an actual group
assignment, in which case it should just fail

That's more or less it for the open questions that have been raised
so far,
so I just want to remind folks that there's already a voting thread

for

this. I cast my vote a few minutes ago so it should resurface in
everyone's
inbox :)

On Thu, Apr 25, 2024 at 11:42 PM Rohan Desai 


wrote:


117: as Sophie laid out, there are two cases here right:
1. cases that are considered 

[jira] [Commented] (KAFKA-16644) FK join emits duplicate tombstone on left-side delete

2024-04-30 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842514#comment-17842514
 ] 

Matthias J. Sax commented on KAFKA-16644:
-

Sorry. Wrong link. Fixed -> https://issues.apache.org/jira/browse/KAFKA-14748 

> FK join emits duplicate tombstone on left-side delete
> -
>
> Key: KAFKA-16644
> URL: https://issues.apache.org/jira/browse/KAFKA-16644
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>    Reporter: Matthias J. Sax
>Priority: Major
>
> We introduced a regression bug in 3.7.0 release via KAFKA-14748. When a 
> left-hand side record is deleted, the join now emits two tombstone records 
> instead of one.
> The problem was not detected via unit test, because the tests use a `Map` 
> instead of a `List` when verifying the result topic records 
> ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).]
> We should update all test cases to use `List` when reading from the output 
> topic, and of course fix the introduced bug: The 
> `SubscriptionSendProcessorSupplier` is sending two subscription records 
> instead of just a single one: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16644) FK join emits duplicate tombstone on left-side delete

2024-04-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16644:

Description: 
We introduced a regression bug in 3.7.0 release via KAFKA-14748. When a 
left-hand side record is deleted, the join now emits two tombstone records 
instead of one.

The problem was not detected via unit test, because the tests use a `Map` 
instead of a `List` when verifying the result topic records 
([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).]

We should update all test cases to use `List` when reading from the output 
topic, and of course fix the introduced bug: The 
`SubscriptionSendProcessorSupplier` is sending two subscription records instead 
of just a single one: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136]

 

  was:
We introduced a regression bug in 3.7.0 release via KAFKA-14778. When a 
left-hand side record is deleted, the join now emits two tombstone records 
instead of one.

The problem was not detected via unit test, because the tests use a `Map` 
instead of a `List` when verifying the result topic records 
([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).]

We should update all test cases to use `List` when reading from the output 
topic, and of course fix the introduced bug: The 
`SubscriptionSendProcessorSupplier` is sending two subscription records instead 
of just a single one: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136]

 


> FK join emits duplicate tombstone on left-side delete
> -
>
> Key: KAFKA-16644
> URL: https://issues.apache.org/jira/browse/KAFKA-16644
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>        Reporter: Matthias J. Sax
>Priority: Major
>
> We introduced a regression bug in 3.7.0 release via KAFKA-14748. When a 
> left-hand side record is deleted, the join now emits two tombstone records 
> instead of one.
> The problem was not detected via unit test, because the tests use a `Map` 
> instead of a `List` when verifying the result topic records 
> ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).]
> We should update all test cases to use `List` when reading from the output 
> topic, and of course fix the introduced bug: The 
> `SubscriptionSendProcessorSupplier` is sending two subscription records 
> instead of just a single one: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: outerjoin not joining after window

2024-04-30 Thread Matthias J. Sax

Thanks for the information. I ran the code using Kafka locally. After
submitting some records inside and outside of the time window and grace,
the join performed as expected when running locally.


That gives some hope :)




However, they never get into the join.


How do you know this?


Did you check the metric for dropper records? Maybe records are 
considers malformed and dropped? Are you using the same records in 
production and in your local test?




Are there any settings for the stream client that would affect the join?


Not that I can think of... There is one more internal config, but as 
long as data is flowing, it should not impact the result you see.




Are there any settings on the broker side that would affect the join?


No. The join is computed client side. Broker configs should not have any 
impact.



f I increase the log level for the streams API would that

shed some light on what is happening?


I don't think it would help much. The code in question is 
org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin -- but it 
does not do any logging except WARN for the already mentioned "dropping 
malformed" records that is also recorded via JMX.



WARN: "Skipping record due to null key or value. "



If you can identify a specific record from the input which would produce 
an output, but does not, maybe you can try to feed it into your local 
test env and try to re-produce the issue?



-Matthias

On 4/30/24 11:38 AM, Chad Preisler wrote:

Matthias,

Thanks for the information. I ran the code using Kafka locally. After
submitting some records inside and outside of the time window and grace,
the join performed as expected when running locally.

I'm not sure why the join is not working as expected when running against
our actual brokers. We are peeking at the records for the streams and we
are seeing the records get pulled. However, they never get into the join.
It's been over 24 hours since the expected records were created, and there
has been plenty of traffic to advance the stream time. Only records that
have both a left and right side match are getting processed by the join.

Are there any settings for the stream client that would affect the join?

Are there any settings on the broker side that would affect the join?

The outer join is just one part of the topology. Compared to running it
locally there is a lot more data going through the app when running on our
actual servers. If I increase the log level for the streams API would that
shed some light on what is happening? Does anyone know if there are
specific packages that I should increase the log level for? Any specific
log message I can hone in on to tell me what is going on?

Basically, I'm looking for some pointers on where I can start looking.

Thanks,
Chad


On Tue, Apr 30, 2024 at 10:26 AM Matthias J. Sax  wrote:


I expect the join to

execute after the 25 with one side of the join containing a record and

the

other being null


Given that you also have a grace period of 5 minutes, the result will
only be emitted after the grace-period passed and the window is closed
(not when window end time is reached).


One has a

naming convention of "KSTREAM_OUTERSHARED". I see a record there, but

I'm

not sure how to decode that message to see what is in it. What is the
purpose of those messages?


It's an internal store, that stores all records which are subject to be
emitted as left/right join result, ie, if there is no inner join result.
The format used is internal:

https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerde.java

Also note: time is based on event-time, ie, if the input stream stops to
send new records, "stream-time" will stop to advance and the result
might not be emitted because the window does not get closed.

(Last, there is some internal wall-clock time delay of one second to
emit results for performance reasons...)

HTH.

-Matthias

On 4/30/24 6:51 AM, Chad Preisler wrote:

Hello,

I have a KStream to KStream outer join with a time difference of 25

minutes

and 5 minutes of grace.  When I get a record for one side of the join,

but

don't get a record on the other side of the join, I expect the join to
execute after the 25 with one side of the join containing a record and

the

other being null. Is that correct?  If it is correct, it's not working

for

me.

I was poking around on the broker and saw some internal topics. I see the
key I expected to execute the join on some of those topics. One has a
naming convention of "KSTREAM_OUTERSHARED". I see a record there, but I'm
not sure how to decode that message to see what is in it. What is the
purpose of those messages? If I decode the message will it help me see

when

the join should have been executed?

I also see the key on a topic with the naming convention
"KSTREAM_OUTERTHIS".

Are there any other topics that I should be looking at to troubleshoot

this

issue?

Thanks,
Chad







Re: outerjoin not joining after window

2024-04-30 Thread Matthias J. Sax

I expect the join to

execute after the 25 with one side of the join containing a record and the
other being null


Given that you also have a grace period of 5 minutes, the result will 
only be emitted after the grace-period passed and the window is closed 
(not when window end time is reached).



One has a

naming convention of "KSTREAM_OUTERSHARED". I see a record there, but I'm
not sure how to decode that message to see what is in it. What is the
purpose of those messages?


It's an internal store, that stores all records which are subject to be 
emitted as left/right join result, ie, if there is no inner join result. 
The format used is internal: 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerde.java


Also note: time is based on event-time, ie, if the input stream stops to 
send new records, "stream-time" will stop to advance and the result 
might not be emitted because the window does not get closed.


(Last, there is some internal wall-clock time delay of one second to 
emit results for performance reasons...)


HTH.

-Matthias

On 4/30/24 6:51 AM, Chad Preisler wrote:

Hello,

I have a KStream to KStream outer join with a time difference of 25 minutes
and 5 minutes of grace.  When I get a record for one side of the join, but
don't get a record on the other side of the join, I expect the join to
execute after the 25 with one side of the join containing a record and the
other being null. Is that correct?  If it is correct, it's not working for
me.

I was poking around on the broker and saw some internal topics. I see the
key I expected to execute the join on some of those topics. One has a
naming convention of "KSTREAM_OUTERSHARED". I see a record there, but I'm
not sure how to decode that message to see what is in it. What is the
purpose of those messages? If I decode the message will it help me see when
the join should have been executed?

I also see the key on a topic with the naming convention
"KSTREAM_OUTERTHIS".

Are there any other topics that I should be looking at to troubleshoot this
issue?

Thanks,
Chad



[jira] [Commented] (KAFKA-16382) Kafka Streams drop NULL values after reset

2024-04-30 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842423#comment-17842423
 ] 

Matthias J. Sax commented on KAFKA-16382:
-

Not yet from our side... Working on other things atm. Not sure when we will be 
able to pick it up, or if anybody from the community wants to take it.

> Kafka Streams drop NULL values after reset
> --
>
> Key: KAFKA-16382
> URL: https://issues.apache.org/jira/browse/KAFKA-16382
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.1
>Reporter: Stanislav Spiridonov
>Priority: Major
>
> Kafka Streams (KTable) drops null values after full reset.
> See 
> [https://github.com/foal/Null-Issue/blob/main/src/main/java/NullProblemExample.java]
>  for sample topology
> Step to reproduce (req NULL-IN, NULL-IN-AUX, NULL-OUT topics)
>  # Start example - 1st round
>  # Send to NULL-IN "A1:a" -> NULL-OUT "A1:anull"
>  # Send to NULL-IN-AUX "A1:b" -> NULL-OUT "A1:anull, A1:ab"
>  # Stop application 
>  # Run kafka-streams-application-reset 
> {code:java}
> call bin/windows/kafka-streams-application-reset --application-id 
> nullproblem-example^
>  --input-topics "NULL-IN,NULL-IN-AUX"^
>  --bootstrap-server "localhost:9092"
> {code}
>  # Send to NULL-IN-AUX "A1:" -> NULL-OUT "A1:anull, A1:ab" - it is Ok (no app 
> running yet)
>  # Start example - 2nd round
>  # After initialization -> NULL-OUT *still contains* 2 messages "A1:anull, 
> A1:ab"
>  # Expected output *3 messages* "A1:anull, A1:ab, {*}A1:{*}"
> The issue is NOT reproduced if application just restarted (skip step 5). 
> The issue is NOT reproduced if internal cache is disabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16645) CVEs in 3.7.0 docker image

2024-04-30 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842419#comment-17842419
 ] 

Matthias J. Sax commented on KAFKA-16645:
-

I believe fixing these CVEs should be a blocker for 3.7.1 and 3.8.0? Thoughts?

> CVEs in 3.7.0 docker image
> --
>
> Key: KAFKA-16645
> URL: https://issues.apache.org/jira/browse/KAFKA-16645
> Project: Kafka
>  Issue Type: Task
>Affects Versions: 3.7.0
>Reporter: Mickael Maison
>Priority: Blocker
> Fix For: 3.8.0, 3.7.1
>
>
> Our [Docker Image CVE 
> Scanner|https://github.com/apache/kafka/actions/runs/874393] GitHub 
> action reports 2 high CVEs in our base image:
> apache/kafka:3.7.0 (alpine 3.19.1)
> ==
> Total: 2 (HIGH: 2, CRITICAL: 0)
> ┌──┬┬──┬┬───┬───┬─┐
> │ Library  │ Vulnerability  │ Severity │ Status │ Installed Version │ Fixed 
> Version │Title│
> ├──┼┼──┼┼───┼───┼─┤
> │ libexpat │ CVE-2023-52425 │ HIGH │ fixed  │ 2.5.0-r2  │ 
> 2.6.0-r0  │ expat: parsing large tokens can trigger a denial of service │
> │  ││  ││   │ 
>   │ https://avd.aquasec.com/nvd/cve-2023-52425  │
> │  ├┤  ││   
> ├───┼─┤
> │  │ CVE-2024-28757 │  ││   │ 
> 2.6.2-r0  │ expat: XML Entity Expansion │
> │  ││  ││   │ 
>   │ https://avd.aquasec.com/nvd/cve-2024-28757  │
> └──┴┴──┴┴───┴───┴─┘
> Looking at the 
> [KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-975%3A+Docker+Image+for+Apache+Kafka#KIP975:DockerImageforApacheKafka-WhatifweobserveabugoracriticalCVEinthereleasedApacheKafkaDockerImage?]
>  that introduced the docker images, it seems we should release a bugfix when 
> high CVEs are detected. It would be good to investigate and assess whether 
> Kafka is impacted or not.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16645) CVEs in 3.7.0 docker image

2024-04-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16645:

Priority: Blocker  (was: Major)

> CVEs in 3.7.0 docker image
> --
>
> Key: KAFKA-16645
> URL: https://issues.apache.org/jira/browse/KAFKA-16645
> Project: Kafka
>  Issue Type: Task
>Affects Versions: 3.7.0
>Reporter: Mickael Maison
>Priority: Blocker
>
> Our [Docker Image CVE 
> Scanner|https://github.com/apache/kafka/actions/runs/874393] GitHub 
> action reports 2 high CVEs in our base image:
> apache/kafka:3.7.0 (alpine 3.19.1)
> ==
> Total: 2 (HIGH: 2, CRITICAL: 0)
> ┌──┬┬──┬┬───┬───┬─┐
> │ Library  │ Vulnerability  │ Severity │ Status │ Installed Version │ Fixed 
> Version │Title│
> ├──┼┼──┼┼───┼───┼─┤
> │ libexpat │ CVE-2023-52425 │ HIGH │ fixed  │ 2.5.0-r2  │ 
> 2.6.0-r0  │ expat: parsing large tokens can trigger a denial of service │
> │  ││  ││   │ 
>   │ https://avd.aquasec.com/nvd/cve-2023-52425  │
> │  ├┤  ││   
> ├───┼─┤
> │  │ CVE-2024-28757 │  ││   │ 
> 2.6.2-r0  │ expat: XML Entity Expansion │
> │  ││  ││   │ 
>   │ https://avd.aquasec.com/nvd/cve-2024-28757  │
> └──┴┴──┴┴───┴───┴─┘
> Looking at the 
> [KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-975%3A+Docker+Image+for+Apache+Kafka#KIP975:DockerImageforApacheKafka-WhatifweobserveabugoracriticalCVEinthereleasedApacheKafkaDockerImage?]
>  that introduced the docker images, it seems we should release a bugfix when 
> high CVEs are detected. It would be good to investigate and assess whether 
> Kafka is impacted or not.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16645) CVEs in 3.7.0 docker image

2024-04-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16645:

Fix Version/s: 3.8.0
   3.7.1

> CVEs in 3.7.0 docker image
> --
>
> Key: KAFKA-16645
> URL: https://issues.apache.org/jira/browse/KAFKA-16645
> Project: Kafka
>  Issue Type: Task
>Affects Versions: 3.7.0
>Reporter: Mickael Maison
>Priority: Blocker
> Fix For: 3.8.0, 3.7.1
>
>
> Our [Docker Image CVE 
> Scanner|https://github.com/apache/kafka/actions/runs/874393] GitHub 
> action reports 2 high CVEs in our base image:
> apache/kafka:3.7.0 (alpine 3.19.1)
> ==
> Total: 2 (HIGH: 2, CRITICAL: 0)
> ┌──┬┬──┬┬───┬───┬─┐
> │ Library  │ Vulnerability  │ Severity │ Status │ Installed Version │ Fixed 
> Version │Title│
> ├──┼┼──┼┼───┼───┼─┤
> │ libexpat │ CVE-2023-52425 │ HIGH │ fixed  │ 2.5.0-r2  │ 
> 2.6.0-r0  │ expat: parsing large tokens can trigger a denial of service │
> │  ││  ││   │ 
>   │ https://avd.aquasec.com/nvd/cve-2023-52425  │
> │  ├┤  ││   
> ├───┼─┤
> │  │ CVE-2024-28757 │  ││   │ 
> 2.6.2-r0  │ expat: XML Entity Expansion │
> │  ││  ││   │ 
>   │ https://avd.aquasec.com/nvd/cve-2024-28757  │
> └──┴┴──┴┴───┴───┴─┘
> Looking at the 
> [KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-975%3A+Docker+Image+for+Apache+Kafka#KIP975:DockerImageforApacheKafka-WhatifweobserveabugoracriticalCVEinthereleasedApacheKafkaDockerImage?]
>  that introduced the docker images, it seems we should release a bugfix when 
> high CVEs are detected. It would be good to investigate and assess whether 
> Kafka is impacted or not.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16644) FK join emits duplicate tombstone on left-side delete

2024-04-30 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842417#comment-17842417
 ] 

Matthias J. Sax commented on KAFKA-16644:
-

Thanks for the pointer. I did not have time yet to dig into the details of 
K16394... If that's the case, we can close this as duplicate; will take care of 
it after I read up on K16394.

> FK join emits duplicate tombstone on left-side delete
> -
>
> Key: KAFKA-16644
> URL: https://issues.apache.org/jira/browse/KAFKA-16644
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>        Reporter: Matthias J. Sax
>Priority: Major
>
> We introduced a regression bug in 3.7.0 release via KAFKA-14778. When a 
> left-hand side record is deleted, the join now emits two tombstone records 
> instead of one.
> The problem was not detected via unit test, because the tests use a `Map` 
> instead of a `List` when verifying the result topic records 
> ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).]
> We should update all test cases to use `List` when reading from the output 
> topic, and of course fix the introduced bug: The 
> `SubscriptionSendProcessorSupplier` is sending two subscription records 
> instead of just a single one: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-29 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842229#comment-17842229
 ] 

Matthias J. Sax commented on KAFKA-16514:
-

Cool. You can find details on the wiki: 
[https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals] 
– if you don't get to it, also totally ok; Sophie mentioned that she might also 
be able to pick it up (but most likely not for 3.8 release...)

(Btw: the wiki account creation via self-service is currently broken, but we 
can create an account manually if you don't have one.)

Would you be interested to do a PR to update the JavaDocs in the meantime to 
fix them?

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16644) FK join emits duplicate tombstone on left-side delete

2024-04-29 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16644:

Description: 
We introduced a regression bug in 3.7.0 release via KAFKA-14778. When a 
left-hand side record is deleted, the join now emits two tombstone records 
instead of one.

The problem was not detected via unit test, because the tests use a `Map` 
instead of a `List` when verifying the result topic records 
([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).]

We should update all test cases to use `List` when reading from the output 
topic, and of course fix the introduced bug: The 
`SubscriptionSendProcessorSupplier` is sending two subscription records instead 
of just a single one: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136]

 

  was:
We introduced a regression bug in 3.7.0 release via KAFKA-14778. When a 
left-hand side record is deleted, the join now emits two tombstone records 
instead of one.

The problem was not detected via unit test, because the test use a `Map` 
instead of a `List` when verifying the result topic records 
([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).]

We should update all test cases to use `List` when reading from the output 
topic, and of course fix the introduces bug: The 
`SubscriptionSendProcessorSupplier` is sending two subscription records instead 
of just a single one: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136]

 


> FK join emits duplicate tombstone on left-side delete
> -
>
> Key: KAFKA-16644
> URL: https://issues.apache.org/jira/browse/KAFKA-16644
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>        Reporter: Matthias J. Sax
>Priority: Major
>
> We introduced a regression bug in 3.7.0 release via KAFKA-14778. When a 
> left-hand side record is deleted, the join now emits two tombstone records 
> instead of one.
> The problem was not detected via unit test, because the tests use a `Map` 
> instead of a `List` when verifying the result topic records 
> ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).]
> We should update all test cases to use `List` when reading from the output 
> topic, and of course fix the introduced bug: The 
> `SubscriptionSendProcessorSupplier` is sending two subscription records 
> instead of just a single one: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16644) FK join emits duplicate tombstone on left-side delete

2024-04-29 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16644:

Summary: FK join emits duplicate tombstone on left-side delete  (was: FK 
join emit duplicate tombstone on left-side delete)

> FK join emits duplicate tombstone on left-side delete
> -
>
> Key: KAFKA-16644
> URL: https://issues.apache.org/jira/browse/KAFKA-16644
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>        Reporter: Matthias J. Sax
>Priority: Major
>
> We introduced a regression bug in 3.7.0 release via KAFKA-14778. When a 
> left-hand side record is deleted, the join now emits two tombstone records 
> instead of one.
> The problem was not detected via unit test, because the test use a `Map` 
> instead of a `List` when verifying the result topic records 
> ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).]
> We should update all test cases to use `List` when reading from the output 
> topic, and of course fix the introduces bug: The 
> `SubscriptionSendProcessorSupplier` is sending two subscription records 
> instead of just a single one: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16644) FK join emit duplicate tombstone on left-side delete

2024-04-29 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16644:
---

 Summary: FK join emit duplicate tombstone on left-side delete
 Key: KAFKA-16644
 URL: https://issues.apache.org/jira/browse/KAFKA-16644
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.7.0
Reporter: Matthias J. Sax


We introduced a regression bug in 3.7.0 release via KAFKA-14778. When a 
left-hand side record is deleted, the join now emits two tombstone records 
instead of one.

The problem was not detected via unit test, because the test use a `Map` 
instead of a `List` when verifying the result topic records 
([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).]

We should update all test cases to use `List` when reading from the output 
topic, and of course fix the introduces bug: The 
`SubscriptionSendProcessorSupplier` is sending two subscription records instead 
of just a single one: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136]

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16644) FK join emit duplicate tombstone on left-side delete

2024-04-29 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16644:
---

 Summary: FK join emit duplicate tombstone on left-side delete
 Key: KAFKA-16644
 URL: https://issues.apache.org/jira/browse/KAFKA-16644
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.7.0
Reporter: Matthias J. Sax


We introduced a regression bug in 3.7.0 release via KAFKA-14778. When a 
left-hand side record is deleted, the join now emits two tombstone records 
instead of one.

The problem was not detected via unit test, because the test use a `Map` 
instead of a `List` when verifying the result topic records 
([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).]

We should update all test cases to use `List` when reading from the output 
topic, and of course fix the introduces bug: The 
`SubscriptionSendProcessorSupplier` is sending two subscription records instead 
of just a single one: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136]

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-29 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842183#comment-17842183
 ] 

Matthias J. Sax commented on KAFKA-16514:
-

I would not call it a bug, but more like a feature gap. Historically, 
consumer.close() sends a leave group request. For KS, we only added an internal 
config to the consumer to disable sending the leave group request.

Independently, static group membership was added, and a new admit API was added 
to remove a static member from a group. The purpose for this admin feature was 
(IIRC), to allow triggering a rebalance for static groups (which usually run 
with high session timeouts) for the case of a crashed member that won't come 
back again.

Thus, it's two independent features that just don't play nice with each other. 
– In additional, we combine both features with KafkaStreams#close(CloseOptions) 
but given how the APIs are build, it only works for static members.

Thus, there is not really a reason, but it just happens that it all was 
implemented this way given historic context etc.

I am in favor of doing a KIP to add something similar like "CloseOption" to 
Consumer#close() (independent of static membership of not). [~sal.sorrentino] 
Would you be interested to pick it up and write a KIP? We might still be able 
to get it into 3.8 release if we hurry up.

[~lianetm] – what is the purpose of -2 code? In the end, not sending any 
request, with a large enough session timeout, no rebalance would be triggered 
anyway? What does change is we send -2 instead of just not sending any leaver 
group request on close()?

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Wiki account sign-up

2024-04-29 Thread Matthias J. Sax

Hi,

as many of you know, there is a issue with creating new wiki accounts 
right now: https://issues.apache.org/jira/browse/INFRA-25451


A fix might take some more time. In the meantime, please reply to the 
INFRA ticket with your email address, and accounts will be created 
manually in the mean time.



-Matthias


Re: Assistance Needed with Creating Wiki ID for Kafka Contribution

2024-04-29 Thread Matthias J. Sax
It's a known issue and INFRA is working on a solution: 
https://issues.apache.org/jira/browse/INFRA-25451


In the mean time, users can be added manually (cf the ticket, and reply 
there to get added).


-Matthias

On 3/28/24 5:10 AM, Prashant Jagtap wrote:

Hi,

I hope this email finds you well.

I've been eager to contribute to Apache Kafka and have started following 
the procedure outlined in the Kafka Improvement Proposals (KIP) 
documentation - 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals#KafkaImprovementProposals-GettingStarted . I've successfully signed up for the Developer mailing list and created a Jira ID.
However, I've encountered a hurdle while trying to create a Wiki ID by 
following this url - https://cwiki.apache.org/confluence/signup.action 
. Despite the 
provided instructions, I'm unable to complete the sign-up process for 
the Wiki ID. Attached the screenshot for your reference.

Could you please assist me with the steps required to create a Wiki ID?
(I have already sent an email to infrastruct...@apache.org regarding the 
same)


Thank you for your time and support.

Thanks and Regards,
Prashant Jagtap


Re: How to find out the end of the session window

2024-04-29 Thread Matthias J. Sax

Did you look into .windowedBy(...).emitStrategy(...) ?

Using emit-final you would get an downstream even only after the window 
closed.


-Matthias

On 4/29/24 1:43 AM, Santhoshi Mekala wrote:

Hi Team,

We have the below requirement:
We are processing batch logs in kstreams. Currently, we are storing the
batch logs in kafka topic after processing. We would like to integrate with
object storage to store the batch logs in object storage after processing.
For batch logs, we are using Session windows. We would like to emit a
special event when the session window is closed and based on that event, we
will aggregate all the logs related to a key and will send it to the object
storage. For this, we need to know the end of the session window. Could you
please let me know if there are ways to identify the end of the session
windows and emit a special event.

Thank you!



[jira] [Commented] (KAFKA-16584) Make log processing summary configurable or debug

2024-04-26 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841332#comment-17841332
 ] 

Matthias J. Sax commented on KAFKA-16584:
-

There is an issue with creating new account: 
https://issues.apache.org/jira/browse/INFRA-25451 – we are waiting for infra 
team to resolve it. Unfortunately, we cannot do anything about it.

The only thing I can offer is, if you prepare a KIP using some other tool (eg 
google doc or similar) and share it with me, and I can c it in the wiki for 
you.

> Make log processing summary configurable or debug
> -
>
> Key: KAFKA-16584
> URL: https://issues.apache.org/jira/browse/KAFKA-16584
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.2
>Reporter: Andras Hatvani
>Assignee: dujian
>Priority: Major
>  Labels: needs-kip, newbie
>
> Currently *every two minutes for every stream thread* statistics will be 
> logged on INFO log level. 
> {code}
> 2024-04-18T09:18:23.790+02:00  INFO 33178 --- [service] [-StreamThread-1] 
> o.a.k.s.p.internals.StreamThread         : stream-thread 
> [service-149405a3-c7e3-4505-8bbd-c3bff226b115-StreamThread-1] Processed 0 
> total records, ran 0 punctuators, and committed 0 total tasks since the last 
> update {code}
> This is absolutely unnecessary and even harmful since it fills the logs and 
> thus storage space with unwanted and useless data. Otherwise the INFO logs 
> are useful and helpful, therefore it's not an option to raise the log level 
> to WARN.
> Please make the logProcessingSummary 
> * either to a DEBUG level log or
> * make it configurable so that it can be disabled.
> This is the relevant code: 
> https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1073



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Confluence edit access

2024-04-26 Thread Matthias J. Sax

Thanks. You should be all set.

-Matthias

On 4/25/24 10:49 PM, Claude Warren wrote:

My Confluence ID is "claude"

On Thu, Apr 25, 2024 at 8:40 PM Matthias J. Sax  wrote:


What's your wiki ID? We can grant write access on our side if you have
already an account.

-Matthias

On 4/25/24 4:06 AM, Claude Warren wrote:

I would like to get edit access to the Kafka confluence so that I can

work

on KIP-936.  Can someone here do that or do I need to go through Infra?

Claude








Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-25 Thread Matthias J. Sax
atch whatever exception we throw
if the task lag computation fails and not know how to handle it. But I
suppose we can just say in the javadocs that you can/should not catch it
and/or rethrow to allow Streams to recover and re-attempt. I agree we
should have Streams just handle this transparently for users and not
require them to rebuild the assignment on their own. I'll add this to the
javadocs -- and I don't think we need to introduce a new exception type
even. We have the "TaskAssignmentException" already which behaves similarly
now -- ie if there's an error during assignment, we throw this and return
the same assignment back and schedule a followup.

So how about we (a) put in the TaskAssignor#assign javadocs that if a
TaskAssignmentException is thrown, Streams will return the same assignment
and automatically schedule an immediate followup rebalance, and (b) note in
the #kafkaStreamsStates(boolean computeTaskLags) javadocs that this can
throw a TaskAssignmentException, which should be rethrown in order to retry
the rebalance. I think this is basically what you're saying here so I'll go
ahead and update the KIP with this but lmk if there's anything else

But coming back to what I said in 108(2) -- I do think it would be valuable
for users to be able to determine their own followup rebalance schedule,
even for "immediate" followup rebalances (again, as a way to mitigate
issues like KAFKA-14419 <https://issues.apache.org/jira/browse/KAFKA-14419>).
Let's say there's an error in someone's own assignor code, or again, it's
trying to make a decision based on some custom metrics that are
unavailable. In this case I feel the utility method would still be helpful,
even if we would handle it transparently for errors in things like the task
lag computation. It's a utility method -- there for people who want to use
it, everyone else can ignore it (and most users probably will). But for
those who want ultimate control (mwahaha) it's a nice-to-have so they don't
have to do *everything* from scratch. WDYT?

117. That's fair. I agree this is probably the most tricky of the open
questions, and I'm happy to defer. Although I do strongly believe that we
shouldn't call certain kinds of assignments "invalid" (such as a task not
being assigned to anyone). For the clearly-invalid assignment cases, I'd
err on the side of not holding users hands too much for now, but again:
would be happy to defer if anyone has another suggestion and/or strong
opinion.

On Wed, Apr 24, 2024 at 10:13 PM Matthias J. Sax  wrote:


104: I also don't feel super strong about it. Not sure if
`onAssignment()` might overload the name in a confusing way? In the end,
when the method is called, we don't assign anything? -- Guess, I am fine
with whatever Rohan picks as a name from the suggestions we have so far.


107: Did not think about how to do it yet. Just raised the question to
see if I am even heading into the right direction or not... I did not
propose to remove the method; it's clear that we need it.

Thinking about it a little more, what we actually want to convey is a
certain "processing capacity" an instance has? Thus,
`numConsumerClients()` might not reflect this in the future? Should we
just generically call it `processingCapacity()` or similar and for now
explain in the JavaDocs that it maps to number of (currently running)
`StreamsThread` (currently running, because users can dynamically
add/remove them...). We can later update the JavaDocs when we have
"processing threads" and point to number of processing threads? Maybe
Lucas/Bruno can provide more input on what/how we plan the future
threading model and configs.

Nit: not sure if we need the "NOTE" section at all? If we think we want
it, maybe remove from the KIP and we can discuss in more detail on the
PR (think it could be improved). Don't think the JavaDocs on he KIP but
be 100% accurate to what we put into the code later.


108: I guess my question is two-fold. (1) Does user-code need to decide
to schedule a probing rebalance to begin with? Or could the
non-customizable part of `StreamsPartitionAssignor` decide it? (2) If
custom code really need to make this decision, why would it not just
return a boolean? It seems unnecessary to compute a deadline, given that
the probing rebalance interval is a config? -- Or maybe I am missing
something? If it's about regular probing rebalance vs immediate
rebalance vs no follow up, maybe an enum would do the trick?


115: Thanks for the explanation. This does make sense. I am wondering if
we need the new utility method though? Would it not be possible to
encapsulate all this inside the non-customizable code? The method
`kafkaStreamsStates(boolean computeTaskLags)` will be provided by us and
called by the user code. Thus, if we cannot compute the lag, we could
still throw an exception -- the user code does not need to know anything
about it, and is not supposed to catch this exception

Re: Confluence edit access

2024-04-25 Thread Matthias J. Sax
What's your wiki ID? We can grant write access on our side if you have 
already an account.


-Matthias

On 4/25/24 4:06 AM, Claude Warren wrote:

I would like to get edit access to the Kafka confluence so that I can work
on KIP-936.  Can someone here do that or do I need to go through Infra?

Claude



[jira] [Commented] (KAFKA-16585) No way to forward message from punctuation method in the FixedKeyProcessor

2024-04-25 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840907#comment-17840907
 ] 

Matthias J. Sax commented on KAFKA-16585:
-

{quote}I can use the regular Processor, but as I understand it add some 
overhead comparing with FixedKeyProcessor
{quote}
Where did you get this? The Processor itself does not have overhead. – The only 
think that could happen downstream is, that a unnecessary repartition step 
could be inserted. We are tackling this via 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling]
{quote}{color:#172b4d}Really, I think FixedKeyProcessor do not need to be 
"ensure that the key is not changed". IMHO there is enough to have a key from 
the same partition. So, if you will provide the way to generate the 
{color}*FixedKeyRecord*{color:#172b4d} from any local store it will be 
enough.{color}
{quote}
{color:#172b4d}Well, technically yes, but there is no simply way to 
enforce/check this... We would need to serialize the provided key, pipe it 
through the Partitioner, and compare the computed partition. Or is there some 
other way to do this? – This would be quite expensive to do.{color}

{color:#172b4d}If you feel strong about all this, feel free to do a POC PR and 
write a KIP about it, and we can take it from there. I don't see a simple way 
to do it, and I believe that using a regular Processor is the right way to go 
(especially with KIP-759 on the horizon). {color}

> No way to forward message from punctuation method in the FixedKeyProcessor
> --
>
> Key: KAFKA-16585
> URL: https://issues.apache.org/jira/browse/KAFKA-16585
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.2
>Reporter: Stanislav Spiridonov
>Priority: Major
>
> The FixedKeyProcessorContext can forward only FixedKeyRecord. This class 
> doesn't have a public constructor and can be created based on existing 
> records. But such record usually is absent in the punctuation method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-24 Thread Matthias J. Sax
eamThreads would benefit from relaxing this constraint.

117d. Unknown ProcessId: invalid assignment (should be enforced)
This one definitely sounds the most like a clear contract violation and in
fact, would make it impossible for the StreamsPartitionAssignor to
correctly process the returned KafkaStreamsAssignment into a set of
individual consumer assignments. So we almost certainly need to throw an
exception here/call out an error here. Maybe we can use the #onAssignment
callback to return an error code as well? WDYT?

118. Agreed, right now it's a bit hard to tell which APIs are for the user
to access, and which ones are for them to implement. Updated the KIP to
make this more clear by breaking up the classes into "User APIs" and
"Read-only APIs" (with a short description of each category).

119. Good point, added a public constructor for AssignedTask

120. Also good point, gave the #onAssignment callback a default no-op
implementation


On Wed, Apr 24, 2024 at 2:32 PM Sophie Blee-Goldman 
wrote:


Responding to Bruno first:

(1) I actually think "KafkaStreams" is exactly right here -- for the
reason you said, ultimately this is describing a literal instance of the
"KafkaStreams" class. Glad we hashed this out! (I saw that Rohan went with
StreamsClient but i also prefer KafkaStreams)

(4) Rohan is  right about what I was saying -- but I'm now realizing that
I completely misinterpreted what your concern was. Sorry for the
long-winded and ultimately irrelevant answer. I'm completely fine with
having the return type be a simple Set with additional info such as TaskId
in the AssignedTask class (and I see Rohan already made this change so
we're all good)

(5) I don't insist either way :)   ApplicationState works for me

On Fri, Apr 19, 2024 at 9:37 PM Matthias J. Sax  wrote:


One more thing. It might be good to clearly call out, which interfaced a
user would implement, vs the other ones Kafka Streams implements and
TaskAssignor only uses.

My understanding is, that users would implement `TaskAssignor`,
`TaskAssignment`, and `StreamsClientAssignment`.

For `AssignedTask` it seems that users would actually only need to
instantiate them. Should we add a public constructor?

Also wondering if we should add an empty default implementation for
`onAssignmentComputed()` as it seems not to be strictly necessary to use
this method?


-Matthias

On 4/19/24 7:30 PM, Matthias J. Sax wrote:

Great KIP. I have some minor comments/questions:


100 The KIP says: "In the future, additional plugins can use the same
partition.assignor  prefix". What does this mean?


101 (nit) The KIP says: "Note that the thread-level assignment will
remain an un-configurable internal implementation detail of the
partition assignor (see "Rejected Alternatives" for further thoughts

and

reasoning)." -- When I was reading this the first time, I did not
understand it, and it did only become clear later (eg while reading the
discussion thread). I think it would be good to be a little bit more
explicit, because this is not just some minor thing, but a core design
decision (which I, btw, support).


102 (nit/typo): taskAssignor -> TaskAssignor (somewhere in the text).


103 (nit): "new non-internal package" -> replace 'non-internal' with
'public' :)


104: Method name `TaskAssignor#onAssignmentComputed()` -> the name

seems

to be a little bit clumsy? I kinda like the original

`finalAssignment()`

-- I would also be happy with `onFinalAssignment` to address Bruno's
line of thinking (which I think is a good call out). (Btw:
`finalAssignment` is still used in the text on the KIP and should also
be updated.)


105: Please remove all `private` variables. We should only show public
stuff on the KIP. Everything else is an implementation detail.


106: `TaskAssignment#numStreamsClients()` -- why do we need this

method?

Seems calling `assignment()` gives as a collection and we can just call
size() on it to get the same value? -- Also, why do we explicitly call
out the overwrite of `toString()`; seems unnecessary?


107 `StreamsClientState#numStreamThreads` JavaDocs says: "Returns the
number of StreamThreads on this client, which is equal to the number of
main consumers and represents its overall capacity." -- Given our
planned thread refactoring, this might not hold correct for long (and I
am sure we will forget to updated the JavaDocs later). Talking to Lucas
the plan is to cut down `StreamsThread` to host the consumer (and there
will be only one, and it won't be configurable any longer), and we

would

introduce a number of configurable "processing threads". Can/should we
build this API in a forward looking manner?


108: Why do we need
`StreamsClientAssignment#followupRebalanceDeadline()` -- wondering how
this would be useful?


109 `StreamsClientState#consumers`: should we rename this to
`#consumerClientIds()`?


110 (nit) `StreamsClie

[jira] [Commented] (KAFKA-16584) Make log processing summary configurable or debug

2024-04-24 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840639#comment-17840639
 ] 

Matthias J. Sax commented on KAFKA-16584:
-

Yes, it does required to write code. – It does also require to write a KIP: 
[https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals] 

> Make log processing summary configurable or debug
> -
>
> Key: KAFKA-16584
> URL: https://issues.apache.org/jira/browse/KAFKA-16584
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.2
>Reporter: Andras Hatvani
>Priority: Major
>  Labels: needs-kip, newbie
>
> Currently *every two minutes for every stream thread* statistics will be 
> logged on INFO log level. 
> {code}
> 2024-04-18T09:18:23.790+02:00  INFO 33178 --- [service] [-StreamThread-1] 
> o.a.k.s.p.internals.StreamThread         : stream-thread 
> [service-149405a3-c7e3-4505-8bbd-c3bff226b115-StreamThread-1] Processed 0 
> total records, ran 0 punctuators, and committed 0 total tasks since the last 
> update {code}
> This is absolutely unnecessary and even harmful since it fills the logs and 
> thus storage space with unwanted and useless data. Otherwise the INFO logs 
> are useful and helpful, therefore it's not an option to raise the log level 
> to WARN.
> Please make the logProcessingSummary 
> * either to a DEBUG level log or
> * make it configurable so that it can be disabled.
> This is the relevant code: 
> https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1073



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [ANNOUNCE] New committer: Igor Soarez

2024-04-24 Thread Matthias J. Sax

Congrats!

On 4/24/24 2:29 PM, Bill Bejeck wrote:

Congrats Igor!

-Bill

On Wed, Apr 24, 2024 at 2:37 PM Tom Bentley  wrote:


Congratulations Igor!

On Thu, 25 Apr 2024 at 6:27 AM, Chia-Ping Tsai  wrote:


Congratulations, Igor! you are one of the best Kafka developers!!!

Mickael Maison  於 2024年4月25日 週四 上午2:16寫道:


Congratulations Igor!

On Wed, Apr 24, 2024 at 8:06 PM Colin McCabe 

wrote:


Hi all,

The PMC of Apache Kafka is pleased to announce a new Kafka committer,

Igor Soarez.


Igor has been a Kafka contributor since 2019. In addition to being a

regular contributor and reviewer, he has made significant contributions

to

improving Kafka's JBOD support in KRaft mode. He has also contributed

to

discussing and reviewing many KIPs such as KIP-690, KIP-554, KIP-866,

and

KIP-938.


Congratulations, Igor!

Thanks,

Colin (on behalf of the Apache Kafka PMC)










Re: [ANNOUNCE] New committer: Igor Soarez

2024-04-24 Thread Matthias J. Sax

Congrats!

On 4/24/24 2:29 PM, Bill Bejeck wrote:

Congrats Igor!

-Bill

On Wed, Apr 24, 2024 at 2:37 PM Tom Bentley  wrote:


Congratulations Igor!

On Thu, 25 Apr 2024 at 6:27 AM, Chia-Ping Tsai  wrote:


Congratulations, Igor! you are one of the best Kafka developers!!!

Mickael Maison  於 2024年4月25日 週四 上午2:16寫道:


Congratulations Igor!

On Wed, Apr 24, 2024 at 8:06 PM Colin McCabe 

wrote:


Hi all,

The PMC of Apache Kafka is pleased to announce a new Kafka committer,

Igor Soarez.


Igor has been a Kafka contributor since 2019. In addition to being a

regular contributor and reviewer, he has made significant contributions

to

improving Kafka's JBOD support in KRaft mode. He has also contributed

to

discussing and reviewing many KIPs such as KIP-690, KIP-554, KIP-866,

and

KIP-938.


Congratulations, Igor!

Thanks,

Colin (on behalf of the Apache Kafka PMC)










[jira] [Commented] (KAFKA-16584) Make log processing summary configurable or debug

2024-04-23 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840232#comment-17840232
 ] 

Matthias J. Sax commented on KAFKA-16584:
-

I would prefer to make if configurable personally. Should be a more or less 
simple change, but requires a KIP.

> Make log processing summary configurable or debug
> -
>
> Key: KAFKA-16584
> URL: https://issues.apache.org/jira/browse/KAFKA-16584
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.2
>Reporter: Andras Hatvani
>Priority: Major
>  Labels: needs-kip, newbie
>
> Currently *every two minutes for every stream thread* statistics will be 
> logged on INFO log level. 
> {code}
> 2024-04-18T09:18:23.790+02:00  INFO 33178 --- [service] [-StreamThread-1] 
> o.a.k.s.p.internals.StreamThread         : stream-thread 
> [service-149405a3-c7e3-4505-8bbd-c3bff226b115-StreamThread-1] Processed 0 
> total records, ran 0 punctuators, and committed 0 total tasks since the last 
> update {code}
> This is absolutely unnecessary and even harmful since it fills the logs and 
> thus storage space with unwanted and useless data. Otherwise the INFO logs 
> are useful and helpful, therefore it's not an option to raise the log level 
> to WARN.
> Please make the logProcessingSummary 
> * either to a DEBUG level log or
> * make it configurable so that it can be disabled.
> This is the relevant code: 
> https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1073



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16584) Make log processing summary configurable or debug

2024-04-23 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16584:

Labels: needs-kip newbie  (was: )

> Make log processing summary configurable or debug
> -
>
> Key: KAFKA-16584
> URL: https://issues.apache.org/jira/browse/KAFKA-16584
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.2
>Reporter: Andras Hatvani
>Priority: Major
>  Labels: needs-kip, newbie
>
> Currently *every two minutes for every stream thread* statistics will be 
> logged on INFO log level. 
> {code}
> 2024-04-18T09:18:23.790+02:00  INFO 33178 --- [service] [-StreamThread-1] 
> o.a.k.s.p.internals.StreamThread         : stream-thread 
> [service-149405a3-c7e3-4505-8bbd-c3bff226b115-StreamThread-1] Processed 0 
> total records, ran 0 punctuators, and committed 0 total tasks since the last 
> update {code}
> This is absolutely unnecessary and even harmful since it fills the logs and 
> thus storage space with unwanted and useless data. Otherwise the INFO logs 
> are useful and helpful, therefore it's not an option to raise the log level 
> to WARN.
> Please make the logProcessingSummary 
> * either to a DEBUG level log or
> * make it configurable so that it can be disabled.
> This is the relevant code: 
> https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1073



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1038: Add Custom Error Handler to Producer

2024-04-23 Thread Matthias J. Sax

Thanks Alieh!

A few nits:


1) The new config we add for the producer should be mentioned in the 
"Public Interfaces" section.


2) Why do we use `producer.` prefix for a *producer* config? Should it 
be `exception.handler` only?



-Matthias

On 4/22/24 7:38 AM, Alieh Saeedi wrote:

Thank you all for the feedback!

Addressing the main concern: The KIP is about giving the user the ability
to handle producer exceptions, but to be more conservative and avoid future
issues, we decided to be limited to a short list of exceptions. I included
*RecordTooLargeExceptin* and *UnknownTopicOrPartitionException. *Open to
suggestion for adding some more ;-)

KIP Updates:
- clarified the way that the user should configure the Producer to use the
custom handler. I think adding a producer config property is the cleanest
one.
- changed the *ClientExceptionHandler* to *ProducerExceptionHandler* to be
closer to what we are changing.
- added the ProducerRecord as the input parameter of the handle() method as
well.
- increased the response types to 3 to have fail and two types of continue.
- The default behaviour is having no custom handler, having the
corresponding config parameter set to null. Therefore, the KIP provides no
default implementation of the interface.
- We follow the interface solution as described in the
Rejected Alternetives section.


Cheers,
Alieh


On Thu, Apr 18, 2024 at 8:11 PM Matthias J. Sax  wrote:


Thanks for the KIP Alieh! It addresses an important case for error
handling.

I agree that using this handler would be an expert API, as mentioned by
a few people. But I don't think it would be a reason to not add it. It's
always a tricky tradeoff what to expose to users and to avoid foot guns,
but we added similar handlers to Kafka Streams, and have good experience
with it. Hence, I understand, but don't share the concern raised.

I also agree that there is some responsibility by the user to understand
how such a handler should be implemented to not drop data by accident.
But it seem unavoidable and acceptable.

While I understand that a "simpler / reduced" API (eg via configs) might
also work, I personally prefer a full handler. Configs have the same
issue that they could be miss-used potentially leading to incorrectly
dropped data, but at the same time are less flexible (and thus maybe
ever harder to use correctly...?). Base on my experience, there is also
often weird corner case for which it make sense to also drop records for
other exceptions, and a full handler has the advantage of full
flexibility and "absolute power!".

To be fair: I don't know the exact code paths of the producer in
details, so please keep me honest. But my understanding is, that the KIP
aims to allow users to react to internal exception, and decide to keep
retrying internally, swallow the error and drop the record, or raise the
error?

Maybe the KIP would need to be a little bit more precises what error we
want to cover -- I don't think this list must be exhaustive, as we can
always do follow up KIP to also apply the handler to other errors to
expand the scope of the handler. The KIP does mention examples, but it
might be good to explicitly state for what cases the handler gets applied?

I am also not sure if CONTINUE and FAIL are enough options? Don't we
need three options? Or would `CONTINUE` have different meaning depending
on the type of error? Ie, for a retryable error `CONTINUE` would mean
keep retrying internally, but for a non-retryable error `CONTINUE` means
swallow the error and drop the record? This semantic overload seems
tricky to reason about by users, so it might better to split `CONTINUE`
into two cases -> `RETRY` and `SWALLOW` (or some better names).

Additionally, should we just ship a `DefaultClientExceptionHandler`
which would return `FAIL`, for backward compatibility. Or don't have any
default handler to begin with and allow it to be `null`? I don't see the
need for a specific `TransactionExceptionHandler`. To me, the goal
should be to not modify the default behavior at all, but to just allow
users to change the default behavior if there is a need.

What is missing on the KIP though it, how the handler is passed into the
producer thought? Would we need a new config which allows to set a
custom handler? And/or would we allow to pass in an instance via the
constructor or add a new method to set a handler?


-Matthias

On 4/18/24 10:02 AM, Andrew Schofield wrote:

Hi Alieh,
Thanks for the KIP.

Exception handling in the Kafka producer and consumer is really not

ideal.

It’s even harder working out what’s going on with the consumer.

I’m a bit nervous about this KIP and I agree with Chris that it could do

with additional

motivation. This would be an expert-level interface given how complicated
the exception handling for Kafka has become.

7. The application is not really aware of the batching being done on its

behalf.

The ProduceResponse can actually return an array 

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-23 Thread Matthias J. Sax
Thanks for splitting out this KIP. The discussion shows, that it is a 
complex beast by itself, so worth to discuss by its own.



Couple of question / comment:


100 `StateStore#commit()`: The JavaDoc says "must not be called by 
users" -- I would propose to put a guard in place for this, by either 
throwing an exception (preferable) or adding a no-op implementation (at 
least for our own stores, by wrapping them -- we cannot enforce it for 
custom stores I assume), and document this contract explicitly.



101 adding `.position` to the store: Why do we actually need this? The 
KIP says "To ensure consistency with the committed data and changelog 
offsets" but I am not sure if I can follow? Can you elaborate why 
leaving the `.position` file as-is won't work?



If it's possible at all, it will need to be done by
creating temporary StateManagers and StateStores during rebalance. I think
it is possible, and probably not too expensive, but the devil will be in
the detail.


This sounds like a significant overhead to me. We know that opening a 
single RocksDB takes about 500ms, and thus opening RocksDB to get this 
information might slow down rebalances significantly.



102: It's unclear to me, how `.position` information is added. The KIP 
only says: "position offsets will be stored in RocksDB, in the same 
column family as the changelog offsets". Do you intent to add this 
information to the map passed via `commit(final MapLong> changelogOffsets)`? The KIP should describe this in more detail. 
Also, if my assumption is correct, we might want to rename the parameter 
and also have a better JavaDoc description?



103: Should we make it mandatory (long-term) that all stores (including 
custom stores) manage their offsets internally? Maintaining both options 
and thus both code paths puts a burden on everyone and make the code 
messy. I would strongly prefer if we could have mid-term path to get rid 
of supporting both.  -- For this case, we should deprecate the newly 
added `managesOffsets()` method right away, to point out that we intend 
to remove it. If it's mandatory to maintain offsets for stores, we won't 
need this method any longer. In memory stores can just return null from 
#committedOffset().



104 "downgrading": I think it might be worth to add support for 
downgrading w/o the need to wipe stores? Leveraging `upgrade.from` 
parameter, we could build a two rolling bounce downgrade: (1) the new 
code is started with `upgrade.from` set to a lower version, telling the 
runtime to do the cleanup on `close()` -- (ie, ensure that all data is 
written into `.checkpoint` and `.position` file, and the newly added CL 
is deleted). In a second, rolling bounce, the old code would be able to 
open RocksDB. -- I understand that this implies much more work, but 
downgrade seems to be common enough, that it might be worth it? Even if 
we did not always support this in the past, we have the face the fact 
that KS is getting more and more adopted and as a more mature product 
should support this?





-Matthias







On 4/21/24 11:58 PM, Bruno Cadonna wrote:

Hi all,

How should we proceed here?

1. with the plain .checkpoint file
2. with a way to use the state store interface on unassigned but locally 
existing task state


While I like option 2, I think option 1 is less risky and will give us 
the benefits of transactional state stores sooner. We should consider 
the interface approach afterwards, though.



Best,
Bruno



On 4/17/24 3:15 PM, Bruno Cadonna wrote:

Hi Nick and Sophie,

I think the task ID is not enough to create a state store that can 
read the offsets of non-assigned tasks for lag computation during 
rebalancing. The state store also needs the state directory so that it 
knows where to find the information that it needs to return from 
changelogOffsets().


In general, I think we should proceed with the plain .checkpoint file 
for now and iterate back to the state store solution later since it 
seems it is not that straightforward. Alternatively, Nick could 
timebox an effort to better understand what would be needed for the 
state store solution. Nick, let us know your decision.


Regarding your question about the state store instance. I am not too 
familiar with that part of the code, but I think the state store is 
build when the processor topology is build and the processor topology 
is build per stream task. So there is one instance of processor 
topology and state store per stream task. Try to follow the call in [1].


Best,
Bruno

[1] 
https://github.com/apache/kafka/blob/f52575b17225828d2ff11996030ab7304667deab/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java#L153




On 4/16/24 8:59 PM, Nick Telford wrote:

That does make sense. The one thing I can't figure out is how per-Task
StateStore instances are constructed.

It looks like we construct one StateStore instance for the whole 
Topology
(in InternalTopologyBuilder), and pass that into 

Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-22 Thread Matthias J. Sax
Thu, 11 Apr 2024 at 11:56, Bruno Cadonna  wrote:


Hi Matthias,


1.a
With processor node ID, I mean the ID that is exposed in the tags of
processor node metrics. That ID cannot be internal since it is exposed
in metrics. I think the processor name and the processor node ID is the
same thing. I followed how the processor node ID is set in metrics and I
ended up in addProcessor(name, ...).


1.b
Regarding ProcessingContext, I also thought about a separate class to
pass-in context information into the handler, but then I dismissed the
idea because I thought I was overthinking it. Apparently, I was not
overthinking it if you also had the same idea. So let's consider a
separate class.


4.
Regarding the metric, thanks for pointing to the dropped-record metric,
Matthias. The dropped-record metric is used with the deserialization
handler and the production handler. So, it would make sense to also use
it for this handler. However, the dropped-record metric only records
records that are skipped by the handler and not the number of calls to
the handler. But that difference is probably irrelevant since in case of
FAIL, the metric will be reset anyways since the stream thread will be
restarted. In conclusion, I think the dropped-record metric in
combination with a warn log message might be the better choice to
introducing a new metric.


8.
Regarding the DSL, I think we should close possible gaps in a separate

KIP.



Best,
Bruno

On 4/11/24 12:06 AM, Matthias J. Sax wrote:

Thanks for the KIP. Great discussion.

I am not sure if I understand the proposal from Bruno to hand in the
processor node id? Isn't this internal (could not even find it

quickly).

We do have a processor name, right? Or do I mix up something?

Another question is about `ProcessingContext` -- it contains a lot of
(potentially irrelevant?) metadata. We should think carefully about

what

we want to pass in and what not -- removing stuff is hard, but adding
stuff is easy. It's always an option to create a new interface that

only

exposes stuff we find useful, and allows us to evolve this interface
independent of others. Re-using an existing interface always has the
danger to introduce an undesired coupling that could bite us in the
future. -- It make total sense to pass in `RecordMetadata`, but
`ProcessingContext` (even if already limited compared to
`ProcessorContext`) still seems to be too broad? For example, there is
`getStateStore()` and `schedule()` methods which I think we should not
expose.

The other interesting question is about "what record gets passed in".
For the PAPI, passing in the Processor's input record make a lot of
sense. However, for DSL operators, I am not 100% sure? The DSL often
uses internal types not exposed to the user, and thus I am not sure if
users could write useful code for this case? -- In general, I still
agree that the handler should be implement with a try-catch around
`Processor.process()` but it might not be too useful for DSL processor.
Hence, I am wondering if we need to so something more in the DSL? I
don't have a concrete proposal (a few high level ideas only) and if we
don't do anything special for the DSL I am ok with moving forward with
this KIP as-is, but we should be aware of potential limitations for DSL
users. We can always do a follow up KIP to close gaps when we

understand

the impact better -- covering the DSL would also expand the scope of
this KIP significantly...

About the metric: just to double check. Do we think it's worth to add a
new metric? Or could we re-use the existing "dropped record metric"?



-Matthias


On 4/10/24 5:11 AM, Sebastien Viale wrote:

Hi,

You are right, it will simplify types.

We update the KIP

regards

Sébastien *VIALE***

*MICHELIN GROUP* - InfORMATION Technology
*Technical Expert Kafka*

Carmes / Bâtiment A17 4e / 63100 Clermont-Ferrand





*De :* Bruno Cadonna 
*Envoyé :* mercredi 10 avril 2024 10:38
*À :* dev@kafka.apache.org 
*Objet :* [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception
handler for exceptions occuring during processing
Warning External sender Do not click on any links or open any
attachments unless you trust the sender and know the content is safe.

Hi Loïc, Damien, and Sébastien,

Great that we are converging!


3.
Damien and Loïc, I think in your examples the handler will receive
Record because an Record is passed to
the processor in the following code line:


https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152
<
https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152




I see that we do not need to pass into the the handler a

Record
byte[]> just because we do that for the

DeserializationExceptionHa

[jira] [Commented] (KAFKA-16585) No way to forward message from punctuation method in the FixedKeyProcessor

2024-04-22 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839910#comment-17839910
 ] 

Matthias J. Sax commented on KAFKA-16585:
-

Well, the use-case make sense, but the question is, how can the runtime ensure 
that the key is not changed? The idea of `FixedKeyProcessor` is to ensure that 
the key is not changed, but when we allow to set a key, you could set anything 
and the runtime cannot ensure that the key is "correct". It would be up the 
user-code to do the right thing... what it unclear from your use-case 
description is, why can't you use a regular `Processor`?

> No way to forward message from punctuation method in the FixedKeyProcessor
> --
>
> Key: KAFKA-16585
> URL: https://issues.apache.org/jira/browse/KAFKA-16585
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.2
>Reporter: Stanislav Spiridonov
>Priority: Major
>
> The FixedKeyProcessorContext can forward only FixedKeyRecord. This class 
> doesn't have a public constructor and can be created based on existing 
> records. But such record usually is absent in the punctuation method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16567) Add New Stream Metrics based on KIP-869

2024-04-22 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839907#comment-17839907
 ] 

Matthias J. Sax commented on KAFKA-16567:
-

Thanks Bruno – makes sense to me – should we move K16336 out of the 4.0 ticket, 
and into the 5.0 one? If we enable state updater in 3.8, it need to honor our 
deprecation period and can only remove the metrics in 5.0, right?

For this ticket, it'll link it to the KIP and update it accordingly. It's not a 
blocker for 4.0.

> Add New Stream Metrics based on KIP-869
> ---
>
> Key: KAFKA-16567
> URL: https://issues.apache.org/jira/browse/KAFKA-16567
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Walter Hernandez
>Priority: Blocker
>  Labels: kip
> Fix For: 4.0.0
>
>
> Add the following metrics to the state updater:
>  * restoring-active-tasks: count
>  * restoring-standby-tasks: count
>  * paused-active-tasks: count
>  * paused-standby-tasks: count
>  * idle-ratio: percentage
>  * restore-ratio: percentage
>  * checkpoint-ratio: percentage
>  * restore-records-total: count
>  * restore-records-rate: rate
>  * restore-call-rate: rate



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16567) Add New Stream Metrics based on KIP-869

2024-04-22 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16567:

Priority: Major  (was: Blocker)

> Add New Stream Metrics based on KIP-869
> ---
>
> Key: KAFKA-16567
> URL: https://issues.apache.org/jira/browse/KAFKA-16567
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Walter Hernandez
>Priority: Major
>  Labels: kip
>
> Add the following metrics to the state updater:
>  * restoring-active-tasks: count
>  * restoring-standby-tasks: count
>  * paused-active-tasks: count
>  * paused-standby-tasks: count
>  * idle-ratio: percentage
>  * restore-ratio: percentage
>  * checkpoint-ratio: percentage
>  * restore-records-total: count
>  * restore-records-rate: rate
>  * restore-call-rate: rate



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16567) Add New Stream Metrics based on KIP-869

2024-04-22 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16567:

Fix Version/s: (was: 4.0.0)

> Add New Stream Metrics based on KIP-869
> ---
>
> Key: KAFKA-16567
> URL: https://issues.apache.org/jira/browse/KAFKA-16567
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Walter Hernandez
>Priority: Blocker
>  Labels: kip
>
> Add the following metrics to the state updater:
>  * restoring-active-tasks: count
>  * restoring-standby-tasks: count
>  * paused-active-tasks: count
>  * paused-standby-tasks: count
>  * idle-ratio: percentage
>  * restore-ratio: percentage
>  * checkpoint-ratio: percentage
>  * restore-records-total: count
>  * restore-records-rate: rate
>  * restore-call-rate: rate



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-22 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839905#comment-17839905
 ] 

Matthias J. Sax commented on KAFKA-16514:
-

Well, in general we can, but the internal flag / config we set on the consumer 
is immutable – configs are in general immutable, so it's not something we can 
just change. Thus, to get some flexibility into `close()` we need a KIP to 
change the consumer.

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Failed to initialize processor KSTREAM-AGGREGATE-0000000001

2024-04-21 Thread Matthias J. Sax
Not sure either, but it sounds like a bug to me. Can you reproduce this 
reliably? What version are you using?


It would be best if you could file a Jira ticket and we can take it from 
there.



-Matthias

On 4/21/24 5:38 PM, Penumarthi Durga Prasad Chowdary wrote:

Hi ,
I have an issue in kafka-streams while constructing kafka-streams state
store  windows(TimeWindow and SessionWindow). While kafka-streams
processing data sometimes intermittent kafka-streams process throwing below
error
ThreadName:
kafka-streams-exec-0-test-store-6d676cf0-3910-4c25-bfad-ea2b98953db3-StreamThread-9
TraceID: unknown  CorelationID: eff36722-1430-4ffb-bf2e-c6e6cf6ae164
  Message: stream-client [ kafka-streams-exec-0-test-store
-6d676cf0-3910-4c25-bfad-ea2b98953db3] Replacing thread in the streams
uncaught exception handler
org.apache.kafka.streams.errors.StreamsException: failed to initialize
processor KSTREAM-AGGREGATE-01
   at
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:115)
   at
org.apache.kafka.streams.processor.internals.StreamTask.initializeTopology(StreamTask.java:986)
   at
org.apache.kafka.streams.processor.internals.StreamTask.completeRestoration(StreamTask.java:271)
   at
org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:716)
   at
org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:901)
   at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:778)
   at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
   at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
  Caused by: java.lang.NullPointerException
   at
org.apache.kafka.streams.kstream.internals.TimestampedTupleForwarder.(TimestampedTupleForwarder.java:46)
   at
org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.init(KStreamSessionWindowAggregate.java:138)
   at
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:107)
   ... 7 more
Here my understanding is state-store is null and at that time
stateStore.flush() gets invoked to send the data to stateStore, this leads
to the above error. This error can be caught inside kafka-streams
setUncaughtExceptionHandler.
   streams.setUncaughtExceptionHandler(throwable -> {
   LOGGER.error("Exception in streams", throwable);
   return
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
   });
I'm uncertain about the exact reason for this issue. Everything seems to be
in order, including the Kafka cluster, and there are no errors in the Kafka
Streams except for a few logs indicating node disconnections.
Is there a better way to handle this error?
When can this issue happen ?
I would like to express my gratitude in advance for any assistance provided.


[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-21 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839441#comment-17839441
 ] 

Matthias J. Sax commented on KAFKA-16514:
-

You are right that there is alway a member-id etc – I am not sure though if 
generating a random group.instance.id would be the right way forward.

Maybe making the Consumer#close() call flexible and allow uses to pass in a 
CloseOption similar to what we do in KafkaStreams would be the cleaner 
approach? An alternative might be (not sure what the exact scope would be) to 
add a new AdminCiient method that allows to pass in a `member.id` to remove a 
consumer from the group?

Another question is: is this "hack" we put into KS to not send a leave group 
request still relevant? A lot of things got improved on the rebalance protocol 
over the years, and it might not be necessary any longer?

Curious to hear what [~ableegoldman] and [~cadonna] think.

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16567) Add New Stream Metrics based on KIP-869

2024-04-21 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839440#comment-17839440
 ] 

Matthias J. Sax commented on KAFKA-16567:
-

I see – this raises a few questions... Given that KIP-869 is not fully 
implemented yet, and the new metrics are not added, I am wondering if we can 
consider the other metric effectively deprecated or not?

[~cadonna] WDYT? Should we push out KAFKA-16336 to 5.0  relaese?

> Add New Stream Metrics based on KIP-869
> ---
>
> Key: KAFKA-16567
> URL: https://issues.apache.org/jira/browse/KAFKA-16567
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Walter Hernandez
>Assignee: Walter Hernandez
>Priority: Blocker
>  Labels: kip
> Fix For: 4.0.0
>
>
> Add the following metrics to the state updater:
>  * restoring-active-tasks: count
>  * restoring-standby-tasks: count
>  * paused-active-tasks: count
>  * paused-standby-tasks: count
>  * idle-ratio: percentage
>  * restore-ratio: percentage
>  * checkpoint-ratio: percentage
>  * restore-records-total: count
>  * restore-records-rate: rate
>  * restore-call-rate: rate



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-19 Thread Matthias J. Sax
One more thing. It might be good to clearly call out, which interfaced a 
user would implement, vs the other ones Kafka Streams implements and 
TaskAssignor only uses.


My understanding is, that users would implement `TaskAssignor`, 
`TaskAssignment`, and `StreamsClientAssignment`.


For `AssignedTask` it seems that users would actually only need to 
instantiate them. Should we add a public constructor?


Also wondering if we should add an empty default implementation for 
`onAssignmentComputed()` as it seems not to be strictly necessary to use 
this method?



-Matthias

On 4/19/24 7:30 PM, Matthias J. Sax wrote:

Great KIP. I have some minor comments/questions:


100 The KIP says: "In the future, additional plugins can use the same 
partition.assignor  prefix". What does this mean?



101 (nit) The KIP says: "Note that the thread-level assignment will 
remain an un-configurable internal implementation detail of the 
partition assignor (see "Rejected Alternatives" for further thoughts and 
reasoning)." -- When I was reading this the first time, I did not 
understand it, and it did only become clear later (eg while reading the 
discussion thread). I think it would be good to be a little bit more 
explicit, because this is not just some minor thing, but a core design 
decision (which I, btw, support).



102 (nit/typo): taskAssignor -> TaskAssignor (somewhere in the text).


103 (nit): "new non-internal package" -> replace 'non-internal' with 
'public' :)



104: Method name `TaskAssignor#onAssignmentComputed()` -> the name seems 
to be a little bit clumsy? I kinda like the original `finalAssignment()` 
-- I would also be happy with `onFinalAssignment` to address Bruno's 
line of thinking (which I think is a good call out). (Btw: 
`finalAssignment` is still used in the text on the KIP and should also 
be updated.)



105: Please remove all `private` variables. We should only show public 
stuff on the KIP. Everything else is an implementation detail.



106: `TaskAssignment#numStreamsClients()` -- why do we need this method? 
Seems calling `assignment()` gives as a collection and we can just call 
size() on it to get the same value? -- Also, why do we explicitly call 
out the overwrite of `toString()`; seems unnecessary?



107 `StreamsClientState#numStreamThreads` JavaDocs says: "Returns the 
number of StreamThreads on this client, which is equal to the number of 
main consumers and represents its overall capacity." -- Given our 
planned thread refactoring, this might not hold correct for long (and I 
am sure we will forget to updated the JavaDocs later). Talking to Lucas 
the plan is to cut down `StreamsThread` to host the consumer (and there 
will be only one, and it won't be configurable any longer), and we would 
introduce a number of configurable "processing threads". Can/should we 
build this API in a forward looking manner?



108: Why do we need 
`StreamsClientAssignment#followupRebalanceDeadline()` -- wondering how 
this would be useful?



109 `StreamsClientState#consumers`: should we rename this to 
`#consumerClientIds()`?



110 (nit) `StreamsClientState#previousl[Active|Standby]Tasks`: JavaDoc 
says 'owned by consumers on this node' -- Should we just say `owned by 
the Streams client`?



111 `StreamsClientState#prevTasksByLag()`: it takes a `String consumer` 
parameter -- not clear what this is -- I guess it's a consumer's 
client.id? If yes, should we rename the parameter `consumerClientId`?



112 `ApplicationState`: what is the reason to have `allTasks()` and 
`stafefulTasks() -- why not have `statelessTasks()` and 
`statefulTasks()` instead? Or all three?



113 `ApplicationState#computeTaskLags()`: I understand the indent/reason 
why we have this one, but it seems to be somewhat difficult to use 
correctly, as it triggers an internal side-effect... Would it be 
possible to replace this method in favor of passing in a `boolean 
computeTaskLag` parameter into #streamClientState() instead, what might 
make it less error prone to use, as it seems the returned 
`StreamsClient` object would be modified when calling #computeTaskTags() 
and thus both are related to each other?



114 nit/typo: `ApplicationState#streamsClientStates()` returns 
`StreamsClientState` not `StreamsClient`.



115 `StreamsAssignorRetryableException`: not sure if I fully understand 
the purpose of this exception.



116 "No actual changes to functionality": allowing to plug in customer 
TaskAssignor sounds like adding new functionality. Can we rephrase this?




117: What happens if the returned assignment is "invalid" -- for 
example, a task might not have been assigned, or is assigned to two 
nodes? Or a standby is assigned to the same node as its active? Or a 
`StreamsClientAssigment` returns an unknown `ProcessId`? (Not sure if 
this list of potential issues is complete or not...)




-Matthias



On 4/18/24 2:05 AM, Bruno 

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-19 Thread Matthias J. Sax

Great KIP. I have some minor comments/questions:


100 The KIP says: "In the future, additional plugins can use the same 
partition.assignor  prefix". What does this mean?



101 (nit) The KIP says: "Note that the thread-level assignment will 
remain an un-configurable internal implementation detail of the 
partition assignor (see "Rejected Alternatives" for further thoughts and 
reasoning)." -- When I was reading this the first time, I did not 
understand it, and it did only become clear later (eg while reading the 
discussion thread). I think it would be good to be a little bit more 
explicit, because this is not just some minor thing, but a core design 
decision (which I, btw, support).



102 (nit/typo): taskAssignor -> TaskAssignor (somewhere in the text).


103 (nit): "new non-internal package" -> replace 'non-internal' with 
'public' :)



104: Method name `TaskAssignor#onAssignmentComputed()` -> the name seems 
to be a little bit clumsy? I kinda like the original `finalAssignment()` 
-- I would also be happy with `onFinalAssignment` to address Bruno's 
line of thinking (which I think is a good call out). (Btw: 
`finalAssignment` is still used in the text on the KIP and should also 
be updated.)



105: Please remove all `private` variables. We should only show public 
stuff on the KIP. Everything else is an implementation detail.



106: `TaskAssignment#numStreamsClients()` -- why do we need this method? 
Seems calling `assignment()` gives as a collection and we can just call 
size() on it to get the same value? -- Also, why do we explicitly call 
out the overwrite of `toString()`; seems unnecessary?



107 `StreamsClientState#numStreamThreads` JavaDocs says: "Returns the 
number of StreamThreads on this client, which is equal to the number of 
main consumers and represents its overall capacity." -- Given our 
planned thread refactoring, this might not hold correct for long (and I 
am sure we will forget to updated the JavaDocs later). Talking to Lucas 
the plan is to cut down `StreamsThread` to host the consumer (and there 
will be only one, and it won't be configurable any longer), and we would 
introduce a number of configurable "processing threads". Can/should we 
build this API in a forward looking manner?



108: Why do we need 
`StreamsClientAssignment#followupRebalanceDeadline()` -- wondering how 
this would be useful?



109 `StreamsClientState#consumers`: should we rename this to 
`#consumerClientIds()`?



110 (nit) `StreamsClientState#previousl[Active|Standby]Tasks`: JavaDoc 
says 'owned by consumers on this node' -- Should we just say `owned by 
the Streams client`?



111 `StreamsClientState#prevTasksByLag()`: it takes a `String consumer` 
parameter -- not clear what this is -- I guess it's a consumer's 
client.id? If yes, should we rename the parameter `consumerClientId`?



112 `ApplicationState`: what is the reason to have `allTasks()` and 
`stafefulTasks() -- why not have `statelessTasks()` and 
`statefulTasks()` instead? Or all three?



113 `ApplicationState#computeTaskLags()`: I understand the indent/reason 
why we have this one, but it seems to be somewhat difficult to use 
correctly, as it triggers an internal side-effect... Would it be 
possible to replace this method in favor of passing in a `boolean 
computeTaskLag` parameter into #streamClientState() instead, what might 
make it less error prone to use, as it seems the returned 
`StreamsClient` object would be modified when calling #computeTaskTags() 
and thus both are related to each other?



114 nit/typo: `ApplicationState#streamsClientStates()` returns 
`StreamsClientState` not `StreamsClient`.



115 `StreamsAssignorRetryableException`: not sure if I fully understand 
the purpose of this exception.



116 "No actual changes to functionality": allowing to plug in customer 
TaskAssignor sounds like adding new functionality. Can we rephrase this?




117: What happens if the returned assignment is "invalid" -- for 
example, a task might not have been assigned, or is assigned to two 
nodes? Or a standby is assigned to the same node as its active? Or a 
`StreamsClientAssigment` returns an unknown `ProcessId`? (Not sure if 
this list of potential issues is complete or not...)




-Matthias



On 4/18/24 2:05 AM, Bruno Cadonna wrote:

Hi Sophie,

Thanks for the clarifications!

(1)
What about replacing Node* with KafkaStreams* or StreamsClient*? I 
prefer KafkaStreams* since that class represents the Kafka Streams 
client. I am also fine with KafkaStreamsClient*. I really would like to 
avoid introducing a new term in Kafka Streams for which we already have 
an equivalent term even if it is used on the brokers since that is a 
different level of abstraction. Additionally, I have never been a big 
fan of the term "instance".


(4)
I think the question is if we need to retrieve assignment metadata by 
task for a Kafka client or if it is enough to iterate over the assigned 
tasks. Could you explain why we cannot add 

[jira] [Resolved] (KAFKA-16486) Integrate metric measurability changes in metrics collector

2024-04-19 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16486.
-
Fix Version/s: 3.8.0
   Resolution: Done

> Integrate metric measurability changes in metrics collector
> ---
>
> Key: KAFKA-16486
> URL: https://issues.apache.org/jira/browse/KAFKA-16486
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16486) Integrate metric measurability changes in metrics collector

2024-04-19 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16486.
-
Fix Version/s: 3.8.0
   Resolution: Done

> Integrate metric measurability changes in metrics collector
> ---
>
> Key: KAFKA-16486
> URL: https://issues.apache.org/jira/browse/KAFKA-16486
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16586) Test TaskAssignorConvergenceTest failing

2024-04-18 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16586:
---

 Summary: Test TaskAssignorConvergenceTest failing
 Key: KAFKA-16586
 URL: https://issues.apache.org/jira/browse/KAFKA-16586
 Project: Kafka
  Issue Type: Test
  Components: streams, unit tests
Reporter: Matthias J. Sax


{code:java}
java.lang.AssertionError: Assertion failed in randomized test. Reproduce with: 
`runRandomizedScenario(-538095696758490522)`.at 
org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.runRandomizedScenario(TaskAssignorConvergenceTest.java:545)
  at 
org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.randomClusterPerturbationsShouldConverge(TaskAssignorConvergenceTest.java:479){code}
This might expose an actual bug (or incorrect test setup) and should be 
reproducible (die not try it myself yet).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16586) Test TaskAssignorConvergenceTest failing

2024-04-18 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16586:
---

 Summary: Test TaskAssignorConvergenceTest failing
 Key: KAFKA-16586
 URL: https://issues.apache.org/jira/browse/KAFKA-16586
 Project: Kafka
  Issue Type: Test
  Components: streams, unit tests
Reporter: Matthias J. Sax


{code:java}
java.lang.AssertionError: Assertion failed in randomized test. Reproduce with: 
`runRandomizedScenario(-538095696758490522)`.at 
org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.runRandomizedScenario(TaskAssignorConvergenceTest.java:545)
  at 
org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.randomClusterPerturbationsShouldConverge(TaskAssignorConvergenceTest.java:479){code}
This might expose an actual bug (or incorrect test setup) and should be 
reproducible (die not try it myself yet).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838829#comment-17838829
 ] 

Matthias J. Sax commented on KAFKA-16514:
-

Thanks for the input. I was not reviewing/voting the original KIP nor the PR. 
Thus, I did just assume there was some mentioning about static groups... As 
there is nothing about it in the KIP as you pointed out, I did some digging and 
the PR reveals why it's only implemented for static members: 
[https://github.com/apache/kafka/pull/12035#discussion_r858263213]

We use admin client "removeMembersFromConsumerGroup" which only works for 
static member, as it take the consumers `group.instance.id` as input. It seems 
it was a pragmatic approach... Re-reading the KIP discussion it seems that 
making it work for regular members would require a change in the consumer API, 
and thus would have been a larger scope KIP (and the idea was to keep the KIP 
scope limited).

Thus, while we might not need a KIP for Kafka Streams, we would need one for 
the consumer to allow KS to use this newly added API... In the mean time, we 
could still to a small PR to update the JavaDocs to call out the current 
limitation (what will not make it a public contract IMHO, so after we get a 
consumer KIP we can still address this limitation w/o another KIP).

Thoughts?

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16573) Streams does not specify where a Serde is needed

2024-04-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16573:

Priority: Minor  (was: Major)

> Streams does not specify where a Serde is needed
> 
>
> Key: KAFKA-16573
> URL: https://issues.apache.org/jira/browse/KAFKA-16573
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Ayoub Omari
>Priority: Minor
>
> Example topology:
> {code:java}
>  builder
>.table("input", Consumed.`with`(Serdes.String(), Serdes.String()))
>.groupBy((key, value) => new KeyValue(value, key))
>.count()
>.toStream()
>.to("output", Produced.`with`(Serdes.String(), Serdes.Long()))
>  {code}
> At runtime, we get the following exception 
> {code:java}
> Please specify a key serde or set one through 
> StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
> org.apache.kafka.common.config.ConfigException: Please specify a key serde or 
> set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
>     at 
> org.apache.kafka.streams.StreamsConfig.defaultKeySerde(StreamsConfig.java:1857)
>     at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.keySerde(AbstractProcessorContext.java:92)
>     at 
> org.apache.kafka.streams.processor.internals.SerdeGetter.keySerde(SerdeGetter.java:47)
>     at 
> org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareSerde(WrappingNullableUtils.java:63)
>     at 
> org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde(WrappingNullableUtils.java:90)
>     at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.initStoreSerde(MeteredKeyValueStore.java:188)
>     at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:143)
>     at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232)
>     at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258){code}
> The error does not give information about the line or the processor causing 
> the issue.
> Here a Grouped was missing inside the groupBy, but because the groupBy api 
> doesn't force to define Grouped, this one can be missed, and it could be 
> difficult to spot on a more complex topology. 
> Also, for someone who needs control over serdes in the topology and doesn't 
> want to define default serdes.
>  
>   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16573) Streams does not specify where a Serde is needed

2024-04-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838827#comment-17838827
 ] 

Matthias J. Sax commented on KAFKA-16573:
-

Thanks for filing this ticket. I think your idea is good; it's for sure an 
improvement over the current state.

> Streams does not specify where a Serde is needed
> 
>
> Key: KAFKA-16573
> URL: https://issues.apache.org/jira/browse/KAFKA-16573
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Ayoub Omari
>Priority: Major
>
> Example topology:
> {code:java}
>  builder
>.table("input", Consumed.`with`(Serdes.String(), Serdes.String()))
>.groupBy((key, value) => new KeyValue(value, key))
>.count()
>.toStream()
>.to("output", Produced.`with`(Serdes.String(), Serdes.Long()))
>  {code}
> At runtime, we get the following exception 
> {code:java}
> Please specify a key serde or set one through 
> StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
> org.apache.kafka.common.config.ConfigException: Please specify a key serde or 
> set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
>     at 
> org.apache.kafka.streams.StreamsConfig.defaultKeySerde(StreamsConfig.java:1857)
>     at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.keySerde(AbstractProcessorContext.java:92)
>     at 
> org.apache.kafka.streams.processor.internals.SerdeGetter.keySerde(SerdeGetter.java:47)
>     at 
> org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareSerde(WrappingNullableUtils.java:63)
>     at 
> org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde(WrappingNullableUtils.java:90)
>     at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.initStoreSerde(MeteredKeyValueStore.java:188)
>     at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:143)
>     at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232)
>     at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258){code}
> The error does not give information about the line or the processor causing 
> the issue.
> Here a Grouped was missing inside the groupBy, but because the groupBy api 
> doesn't force to define Grouped, this one can be missed, and it could be 
> difficult to spot on a more complex topology. 
> Also, for someone who needs control over serdes in the topology and doesn't 
> want to define default serdes.
>  
>   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16280) Expose method to determine Metric Measurability

2024-04-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16280.
-
Resolution: Done

> Expose method to determine Metric Measurability
> ---
>
> Key: KAFKA-16280
> URL: https://issues.apache.org/jira/browse/KAFKA-16280
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Affects Versions: 3.8.0
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
>  Labels: kip
> Fix For: 3.8.0
>
>
> The Jira is to track the development of KIP-1019: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1019%3A+Expose+method+to+determine+Metric+Measurability]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16280) Expose method to determine Metric Measurability

2024-04-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16280:

Issue Type: Improvement  (was: Bug)

> Expose method to determine Metric Measurability
> ---
>
> Key: KAFKA-16280
> URL: https://issues.apache.org/jira/browse/KAFKA-16280
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics
>Affects Versions: 3.8.0
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
>  Labels: kip
> Fix For: 3.8.0
>
>
> The Jira is to track the development of KIP-1019: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1019%3A+Expose+method+to+determine+Metric+Measurability]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16280) Expose method to determine Metric Measurability

2024-04-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16280.
-
Resolution: Done

> Expose method to determine Metric Measurability
> ---
>
> Key: KAFKA-16280
> URL: https://issues.apache.org/jira/browse/KAFKA-16280
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Affects Versions: 3.8.0
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
>  Labels: kip
> Fix For: 3.8.0
>
>
> The Jira is to track the development of KIP-1019: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1019%3A+Expose+method+to+determine+Metric+Measurability]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1038: Add Custom Error Handler to Producer

2024-04-18 Thread Matthias J. Sax

Thanks for the KIP Alieh! It addresses an important case for error handling.

I agree that using this handler would be an expert API, as mentioned by 
a few people. But I don't think it would be a reason to not add it. It's 
always a tricky tradeoff what to expose to users and to avoid foot guns, 
but we added similar handlers to Kafka Streams, and have good experience 
with it. Hence, I understand, but don't share the concern raised.


I also agree that there is some responsibility by the user to understand 
how such a handler should be implemented to not drop data by accident. 
But it seem unavoidable and acceptable.


While I understand that a "simpler / reduced" API (eg via configs) might 
also work, I personally prefer a full handler. Configs have the same 
issue that they could be miss-used potentially leading to incorrectly 
dropped data, but at the same time are less flexible (and thus maybe 
ever harder to use correctly...?). Base on my experience, there is also 
often weird corner case for which it make sense to also drop records for 
other exceptions, and a full handler has the advantage of full 
flexibility and "absolute power!".


To be fair: I don't know the exact code paths of the producer in 
details, so please keep me honest. But my understanding is, that the KIP 
aims to allow users to react to internal exception, and decide to keep 
retrying internally, swallow the error and drop the record, or raise the 
error?


Maybe the KIP would need to be a little bit more precises what error we 
want to cover -- I don't think this list must be exhaustive, as we can 
always do follow up KIP to also apply the handler to other errors to 
expand the scope of the handler. The KIP does mention examples, but it 
might be good to explicitly state for what cases the handler gets applied?


I am also not sure if CONTINUE and FAIL are enough options? Don't we 
need three options? Or would `CONTINUE` have different meaning depending 
on the type of error? Ie, for a retryable error `CONTINUE` would mean 
keep retrying internally, but for a non-retryable error `CONTINUE` means 
swallow the error and drop the record? This semantic overload seems 
tricky to reason about by users, so it might better to split `CONTINUE` 
into two cases -> `RETRY` and `SWALLOW` (or some better names).


Additionally, should we just ship a `DefaultClientExceptionHandler` 
which would return `FAIL`, for backward compatibility. Or don't have any 
default handler to begin with and allow it to be `null`? I don't see the 
need for a specific `TransactionExceptionHandler`. To me, the goal 
should be to not modify the default behavior at all, but to just allow 
users to change the default behavior if there is a need.


What is missing on the KIP though it, how the handler is passed into the 
producer thought? Would we need a new config which allows to set a 
custom handler? And/or would we allow to pass in an instance via the 
constructor or add a new method to set a handler?



-Matthias

On 4/18/24 10:02 AM, Andrew Schofield wrote:

Hi Alieh,
Thanks for the KIP.

Exception handling in the Kafka producer and consumer is really not ideal.
It’s even harder working out what’s going on with the consumer.

I’m a bit nervous about this KIP and I agree with Chris that it could do with 
additional
motivation. This would be an expert-level interface given how complicated
the exception handling for Kafka has become.

7. The application is not really aware of the batching being done on its behalf.
The ProduceResponse can actually return an array of records which failed
per batch. If you get RecordTooLargeException, and want to retry, you probably
need to remove the offending records from the batch and retry it. This is 
getting fiddly.

8. There is already o.a.k.clients.producer.Callback. I wonder whether an
alternative might be to add a method to the existing Callback interface, such 
as:

   ClientExceptionResponse onException(Exception exception)

It would be called when a ProduceResponse contains an error, but the
producer is going to retry. It tells the producer whether to go ahead with the 
retry
or not. The default implementation would be to CONTINUE, because that’s
just continuing to retry as planned. Note that this is a per-record callback, so
the application would be able to understand which records failed.

By using an existing interface, we already know how to configure it and we know
about the threading model for calling it.


Thanks,
Andrew




On 17 Apr 2024, at 18:17, Chris Egerton  wrote:

Hi Alieh,

Thanks for the KIP! The issue with writing to non-existent topics is
particularly frustrating for users of Kafka Connect and has been the source
of a handful of Jira tickets over the years. My thoughts:

1. An additional detail we can add to the motivation (or possibly rejected
alternatives) section is that this kind of custom retry logic can't be
implemented by hand by, e.g., setting retries to 0 in the producer config
and handling 

Re: [DISCUSS] KIP-1036: Extend RecordDeserializationException exception

2024-04-18 Thread Matthias J. Sax
Andrew, thanks for the details about Consumer internals. That's super 
useful for this discussion! -- And it confirms my understanding.


I don't think we want to use ConsumerRecord type thought, 
because for a DLQ the handler wants to write the message into some DLQ 
topic, and thus needs the key and value, so only 
`ConsumerRecord` would work (or maybe `ByteBuffer>`).


While I would be ok with using `ConsumerRecord`, I don't see a huge 
advantage compared to passing in all fields we are interested in 
one-by-one. In the end, if the data is written into a DLQ topic, the 
`ConsumerRecord` object cannot be reused (but the handler will build a 
`ProducerRecord`), and `ConsumerRecord` would "just" be a container -- I 
don't think it would simplify user-code or provide any other benefit, 
but just add an (unnecessary?) level wrapping/indirection?


The only advantage I would see, is for the case that new interesting 
metadata fields get added to the message format -- for this case, using 
`ConsumerRecord` would automatically include these new fields, and we 
don't need to modify the exception class to add them explicitly. But as 
this happens very rarely, it does not seem to provide a huge benefit.


In the end, I would be fine either way. Curious to hear what others think.


-Matthias



On 4/18/24 8:41 AM, Andrew Schofield wrote:

Hi,
Thanks for the KIP. I think it’s an interesting idea and it seems to work 
nicely with how
the clients work today.

Recently, I’ve been digging in to the record deserialization code in the 
consumer as
part of implementing KIP-932. It’s pretty nasty in there.

There are essentially two kinds of problems that can be encountered when
converting the response from a Fetch RPC into ConsumerRecords.

1) The batch might be corrupt, so you can’t even work out what records are in 
there.
2) Individual records cannot be deserialized.

In the second case, the consumer accumulates records from a Fetch response until
it hits a record which it cannot deserialize. If it has any parsed records 
already to return
to the application, it parks the RecordDeserializationException and returns the 
records
so far. Then, on the next poll, because there are no parsed records waiting, it 
throws
the RecordDeserializationException. And so it continues until all fetched 
records are
processed.

I don’t really think of org.apache.kafka.common.record.Record as an external 
interface.
I think it’s a bit close to the wire to be putting on a user-facing interface.

We do know almost everything about the bad record at the point where the
deserialization fails. I wonder whether actually we could use
ConsumerRecord or even ConsumerRecord :)
in the constructor for the RecordDeserializationException. I don’t really like 
having
a long list of each of the individual items of the record’s parts like 
timestamp and so on.
It’s nicer to have an interface to the record that we can evolve without having 
to change
the constructor for this exception.

Thanks,
Andrew


On 18 Apr 2024, at 15:13, Frédérik Rouleau  
wrote:

Hi,

But I guess my main question is really about what metadata we really

want to add to `RecordDeserializationException`? `Record` expose all
kind of internal (serialization) metadata like `keySize()`,
`valueSize()` and many more. For the DLQ use-case it seems we don't
really want any of these? So I am wondering if just adding
key/value/ts/headers would be sufficient?



I think that key/value/ts/headers, topicPartition and offset are all we
need. I do not see any usage for other metadata. If someone has a use case,
I would like to know it.

So in that case we can directly add the data into the exception. We can
keep ByteBuffer for the local field instead of byte[], that will avoid
memory allocation if users do not require it.
I wonder if we should return the ByteBuffer or directly the byte[] (or both
?) which is more convenient for end users. Any thoughts?
Then we can have something like:

public RecordDeserializationException(TopicPartition partition,
 long offset,
 ByteBuffer key,
 ByteBuffer value,
 Header[] headers,
 long timestamp,
 String message,
 Throwable cause);

public TopicPartition topicPartition();

public long offset();

public long timestamp();

public byte[] key(); // Will allocate the array on call

public byte[] value(); // Will allocate the array on call

public Header[] headers();



Regards,
Fred




Re: Streams group final result: EmitStrategy vs Suppressed

2024-04-18 Thread Matthias J. Sax
The main difference is the internal implementation. Semantically, both 
are equivalent.


suppress() uses an in-memory buffer, while `emitStrategy()` does not, 
but modifies the upstream aggregation operator impl, and waits to send 
results downstream, and thus, it's RocksDB based.



-Matthias


On 4/12/24 10:37 AM, Ayoub wrote:

Hello,

*[Not sure if my email went through as I was not subscribed to this mailing
list. Here is my original email]*

I found that there are two ways to send only the final result of a windowed
groupBy, either using Suppressed
.untilWindowCloses
on the final KTable or EmitStrategy

on
the windowed stream.

I tried to compare both but didn't find differences in the result they give.

Are there any differences apart from the moment they are defined within the
pipeline. And is there any preference on using one or the other ?

Thanks,
Ayoub


Le ven. 12 avr. 2024 à 11:50, Ayoub  a écrit :


Hello,

I found that there are two ways to send only the final result of a
windowed groupBy, either using Suppressed
.untilWindowCloses
on the final KTable or EmitStrategy

 on
the windowed stream.

I tried to compare both but didn't find differences in the result they
give.

Are there any differences apart from the moment they are defined within
the pipeline. And Is there any preference on using one or the other ?

Thanks,
Ayoub





Re: Is there any recommendation about header max size?

2024-04-18 Thread Matthias J. Sax
I don't think that there is any specific recommendation. However, there 
is an overall max-message-size config that you need to keep in mind.


-Matthias

On 4/16/24 9:42 AM, Gabriel Giussi wrote:

I have logic in my service to capture exceptions being thrown during
message processing and produce a new message to a different topic with
information about the error. The idea is to leave the message unmodified,
aka produce the exact same bytes to this new topic, therefore I'm planning
on adding the java exception as a header.
By looking at the documentation it is just an array of bytes and it doesn't
say anything about a max size but is there any recommendation about it?
https://kafka.apache.org/documentation/#recordheader



[jira] [Commented] (KAFKA-16585) No way to forward message from punctuation method in the FixedKeyProcessor

2024-04-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838730#comment-17838730
 ] 

Matthias J. Sax commented on KAFKA-16585:
-

Thanks for raising this ticket. Wondering how we could address it though... 
Given that the "contract" is that the record key is not modified, but there is 
no input record, how could the key be set in a meaningful way? – The only thing 
I can think of right now would be to set `key = null`, but it's still 
semantically questionable...

Can you provide more details what you are trying to do?

> No way to forward message from punctuation method in the FixedKeyProcessor
> --
>
> Key: KAFKA-16585
> URL: https://issues.apache.org/jira/browse/KAFKA-16585
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.2
>Reporter: Stanislav Spiridonov
>Priority: Major
>
> The FixedKeyProcessorContext can forward only FixedKeyRecord. This class 
> doesn't have a public constructor and can be created based on existing 
> records. But such record usually is absent in the punctuation method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16567) Add New Stream Metrics based on KIP-869

2024-04-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838728#comment-17838728
 ] 

Matthias J. Sax commented on KAFKA-16567:
-

Why is this ticket marked as "blocker" for 4.0 release?

> Add New Stream Metrics based on KIP-869
> ---
>
> Key: KAFKA-16567
> URL: https://issues.apache.org/jira/browse/KAFKA-16567
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Walter Hernandez
>Assignee: Walter Hernandez
>Priority: Blocker
>  Labels: kip
> Fix For: 4.0.0
>
>
> Add the following metrics to the state updater:
>  * restoring-active-tasks: count
>  * restoring-standby-tasks: count
>  * paused-active-tasks: count
>  * paused-standby-tasks: count
>  * idle-ratio: percentage
>  * restore-ratio: percentage
>  * checkpoint-ratio: percentage
>  * restore-records-total: count
>  * restore-records-rate: rate
>  * restore-call-rate: rate



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16567) Add New Stream Metrics based on KIP-869

2024-04-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16567:

Component/s: streams

> Add New Stream Metrics based on KIP-869
> ---
>
> Key: KAFKA-16567
> URL: https://issues.apache.org/jira/browse/KAFKA-16567
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Walter Hernandez
>Assignee: Walter Hernandez
>Priority: Blocker
> Fix For: 4.0.0
>
>
> Add the following metrics to the state updater:
>  * restoring-active-tasks: count
>  * restoring-standby-tasks: count
>  * paused-active-tasks: count
>  * paused-standby-tasks: count
>  * idle-ratio: percentage
>  * restore-ratio: percentage
>  * checkpoint-ratio: percentage
>  * restore-records-total: count
>  * restore-records-rate: rate
>  * restore-call-rate: rate



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16567) Add New Stream Metrics based on KIP-869

2024-04-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16567:

Labels: kip  (was: )

> Add New Stream Metrics based on KIP-869
> ---
>
> Key: KAFKA-16567
> URL: https://issues.apache.org/jira/browse/KAFKA-16567
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Walter Hernandez
>Assignee: Walter Hernandez
>Priority: Blocker
>  Labels: kip
> Fix For: 4.0.0
>
>
> Add the following metrics to the state updater:
>  * restoring-active-tasks: count
>  * restoring-standby-tasks: count
>  * paused-active-tasks: count
>  * paused-standby-tasks: count
>  * idle-ratio: percentage
>  * restore-ratio: percentage
>  * checkpoint-ratio: percentage
>  * restore-records-total: count
>  * restore-records-rate: rate
>  * restore-call-rate: rate



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16263) Add Kafka Streams docs about available listeners/callback

2024-04-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838727#comment-17838727
 ] 

Matthias J. Sax commented on KAFKA-16263:
-

Yes, these are the handlers this ticket refers do.

> Add Kafka Streams docs about available listeners/callback
> -
>
> Key: KAFKA-16263
> URL: https://issues.apache.org/jira/browse/KAFKA-16263
> Project: Kafka
>  Issue Type: Task
>  Components: docs, streams
>            Reporter: Matthias J. Sax
>Assignee: Kuan Po Tseng
>Priority: Minor
>  Labels: beginner, newbie
>
> Kafka Streams allows to register all kind of listeners and callback (eg, 
> uncaught-exception-handler, restore-listeners, etc) but those are not in the 
> documentation.
> A good place might be 
> [https://kafka.apache.org/documentation/streams/developer-guide/running-app.html]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16336) Remove Deprecated metric standby-process-ratio

2024-04-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838724#comment-17838724
 ] 

Matthias J. Sax commented on KAFKA-16336:
-

The next planned release is 3.8, but we can work on this ticket only for 4.0, 
as it's a breaking change that's only allowed for a major release. – So this 
ticket cannot be picked up yet.

> Remove Deprecated metric standby-process-ratio
> --
>
> Key: KAFKA-16336
> URL: https://issues.apache.org/jira/browse/KAFKA-16336
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>            Reporter: Matthias J. Sax
>Assignee: Walter Hernandez
>Priority: Blocker
> Fix For: 4.0.0
>
>
> Metric "standby-process-ratio" was deprecated in 3.5 release via 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-869%3A+Improve+Streams+State+Restoration+Visibility



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1036: Extend RecordDeserializationException exception

2024-04-18 Thread Matthias J. Sax

Hi,

I am actually not sure if using `Record` is really the right thing? 
While `Record` is technically public API, it does not seem to be 
intended to be exposed to end users?


But I guess my main question is really about what metadata we really 
want to add to `RecordDeserializationException`? `Record` expose all 
kind of internal (serialization) metadata like `keySize()`, 
`valueSize()` and many more. For the DLQ use-case it seems we don't 
really want any of these? So I am wondering if just adding 
key/value/ts/headers would be sufficient?


The motivation section of the KIP is somewhat spare about DLQ details, 
so it's hard to judge what is needed / useful and would would be a leaky 
abstraction?


About "when we cannot construct a `ConsumerRecord` -- I was not really 
digging into it until know, and was just following Kirks comment 
blindly. But looking into the code, we would only not be able to 
construct a `CosumerRecord` when either key or value deserialization 
fails? But as we would pass in byte[] type it would not matter. -- Kirk 
did also mention a corrupted batch, but it seems for this case we might 
not even hit the deserialization code path, but would error out earlier?


I was also looking into the build setup, and I think the idea of the 
import control is to have some sanity check about import dependencies. I 
currently don't see why we should not add an allow rule for Record.


But if we decide to not pass in Record/ConsumerRecord both questions are 
void anyway. Of course, for this case, we would need to add a getter 
method for each metadata field we add (but I think that would be totally 
ok?)


I also seen know, that the old constructor is deprecated, and thus, I 
think using `Optional` a return type is not required (already reflected 
on the wiki page).


Bottom line seems to be: the motivation about what metadata is needed 
for the DLQ use-case is not described in much detail and thus it's hard 
to judge what the right design might be?


The wiki account thing is unfortunately nothing we can fix on our side. 
We did file a ticket with INFRA team, but need to wait for them to 
address it... In the meantime, if you can provide the missing 
information, and what you want to get edited, I can help to update the 
wiki page accordingly.



-Matthias

On 4/16/24 11:18 AM, Sophie Blee-Goldman wrote:

Also ignore everything I said about Streams earlier. I didn't look closely
enough on my first pass over the KIP and thought this was changing the
DeserializationExceptionHandler in Streams. I see now that this is
actually about the consumer client's DeserializationException so everything
I said about using a ByteArray Deserialization and Kafka Streams doesn't
apply here. The important thing is just that it still deserializes one
record
at a time, and essentially throws this when it fails to convert the Record
type into a ConsumerRecord type. So there's always only one record
at a type to consider.

Sorry for any confusion I caused

On Tue, Apr 16, 2024 at 11:15 AM Sophie Blee-Goldman 
wrote:


Ah, thanks for the additional context. I should have looked at the code
before I opened my mouth (so to speak)

In that case, I fully agree that using Record instead of ConsumerRecord
makes sense. It does indeed seem like by definition, if there is a
DeserializationException then there is no ConsumerRecord since this
is where/how it gets thrown:

try {
...
return new ConsumerRecord<>(...);
} catch (RuntimeException e) {
...
throw new RecordDeserializationException(...);
}

As you mentioned the Record is an input to the method so we definitely have
one of those, and imo, it makes sense to use. As far as I can tell it's
just
a regular public interface so exposing it shouldn't be an issue just based
on
the class itself. But I'm still a bit concerned about the checkstyle
complaint.

I'll try to find someone who can explain why or if we should avoid
returning
a Record type here. Other than that, I'd say the KIP LGTM as-is and we
could kick off voting

On Tue, Apr 16, 2024 at 10:47 AM Frédérik Rouleau
 wrote:


Thanks Sophie,

I can write something in the KIP on how KStreams solves that issue, but as
I can't create a Wiki account, I will have to find someone to do this on
my
behalf (if someone can work on solving that wiki account creation, it
would
be great).

The biggest difference between Record and ConsumerRecord is that data are
stored respectively using ByteBuffer and Byte array.

For the Record option, the object already exists in the parsing method, so
it's roughly just a parameter type change in the Exception. The point is
just about exposing the Record class externally. By the way, the name
Record is also making some IDE a bit crazy by confusing it with the new
Java Record feature. An alternative could be to create another wrapper
type
of just include key and value ByteBuffer in the
RecordDeserializationException itself.

For the ConsumerRecord option, it requires to allocate Byte arrays, even

Re: [ANNOUNCE] New Kafka PMC Member: Greg Harris

2024-04-18 Thread Matthias J. Sax

Congrats Greg!

On 4/15/24 10:44 AM, Hector Geraldino (BLOOMBERG/ 919 3RD A) wrote:

Congrats! Well deserved

From: d...@kafka.apache.org At: 04/13/24 14:42:22 UTC-4:00To:  
d...@kafka.apache.org
Subject: [ANNOUNCE] New Kafka PMC Member: Greg Harris

Hi all,

Greg Harris has been a Kafka committer since July 2023. He has remained
very active and instructive in the community since becoming a committer.
It's my pleasure to announce that Greg is now a member of Kafka PMC.

Congratulations, Greg!

Chris, on behalf of the Apache Kafka PMC




Re: [ANNOUNCE] New Kafka PMC Member: Greg Harris

2024-04-18 Thread Matthias J. Sax

Congrats Greg!

On 4/15/24 10:44 AM, Hector Geraldino (BLOOMBERG/ 919 3RD A) wrote:

Congrats! Well deserved

From: dev@kafka.apache.org At: 04/13/24 14:42:22 UTC-4:00To:  
dev@kafka.apache.org
Subject: [ANNOUNCE] New Kafka PMC Member: Greg Harris

Hi all,

Greg Harris has been a Kafka committer since July 2023. He has remained
very active and instructive in the community since becoming a committer.
It's my pleasure to announce that Greg is now a member of Kafka PMC.

Congratulations, Greg!

Chris, on behalf of the Apache Kafka PMC




[jira] [Created] (KAFKA-16575) Automatically remove KTable aggregation result when group becomes empty

2024-04-17 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16575:
---

 Summary: Automatically remove KTable aggregation result when group 
becomes empty
 Key: KAFKA-16575
 URL: https://issues.apache.org/jira/browse/KAFKA-16575
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


Using `KTable.groupBy(...).aggregate(...)` can handle updates (inserts, 
deletes, actual updates) of the input KTable, by calling the provided `Adder` 
and `Subtractor`. However, when all records from the input table (which map to 
the same group/row in the result table) get removed, the result entry is not 
removed automatically.

For example, if we implement a "count", the count would go to zero for a group 
by default, instead of removing the row from the result, if all input record 
for this group got deleted.

Users can let their `Subtractor` return `null` for this case, to actually 
delete the row, but it's not well documented and it seems it should be a 
built-in feature of the table-aggregation to remove "empty groups" from the 
result, instead of relying on "correct" behavior of user-code.

(Also the built-in `count()` does not return `null`, but actually zero...)

An internal counter how many elements are in a group should be sufficient. Of 
course, there is backward compatibility questions we need to answer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16575) Automatically remove KTable aggregation result when group becomes empty

2024-04-17 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16575:
---

 Summary: Automatically remove KTable aggregation result when group 
becomes empty
 Key: KAFKA-16575
 URL: https://issues.apache.org/jira/browse/KAFKA-16575
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


Using `KTable.groupBy(...).aggregate(...)` can handle updates (inserts, 
deletes, actual updates) of the input KTable, by calling the provided `Adder` 
and `Subtractor`. However, when all records from the input table (which map to 
the same group/row in the result table) get removed, the result entry is not 
removed automatically.

For example, if we implement a "count", the count would go to zero for a group 
by default, instead of removing the row from the result, if all input record 
for this group got deleted.

Users can let their `Subtractor` return `null` for this case, to actually 
delete the row, but it's not well documented and it seems it should be a 
built-in feature of the table-aggregation to remove "empty groups" from the 
result, instead of relying on "correct" behavior of user-code.

(Also the built-in `count()` does not return `null`, but actually zero...)

An internal counter how many elements are in a group should be sufficient. Of 
course, there is backward compatibility questions we need to answer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-12 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836671#comment-17836671
 ] 

Matthias J. Sax commented on KAFKA-16514:
-

CloseOption was introduced via 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-812%3A+Introduce+another+form+of+the+%60KafkaStreams.close%28%29%60+API+that+forces+the+member+to+leave+the+consumer+group]

The reasoning about the design should be on the KIP and corresponding DISCUSS 
thread.

I agree that the JavaDocs are missing a lot of information. Would you be 
interested to do a PR to improve the JavaDocs?

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-10 Thread Matthias J. Sax

Thanks for the KIP. Great discussion.

I am not sure if I understand the proposal from Bruno to hand in the 
processor node id? Isn't this internal (could not even find it quickly). 
We do have a processor name, right? Or do I mix up something?


Another question is about `ProcessingContext` -- it contains a lot of 
(potentially irrelevant?) metadata. We should think carefully about what 
we want to pass in and what not -- removing stuff is hard, but adding 
stuff is easy. It's always an option to create a new interface that only 
exposes stuff we find useful, and allows us to evolve this interface 
independent of others. Re-using an existing interface always has the 
danger to introduce an undesired coupling that could bite us in the 
future. -- It make total sense to pass in `RecordMetadata`, but 
`ProcessingContext` (even if already limited compared to 
`ProcessorContext`) still seems to be too broad? For example, there is 
`getStateStore()` and `schedule()` methods which I think we should not 
expose.


The other interesting question is about "what record gets passed in". 
For the PAPI, passing in the Processor's input record make a lot of 
sense. However, for DSL operators, I am not 100% sure? The DSL often 
uses internal types not exposed to the user, and thus I am not sure if 
users could write useful code for this case? -- In general, I still 
agree that the handler should be implement with a try-catch around 
`Processor.process()` but it might not be too useful for DSL processor. 
Hence, I am wondering if we need to so something more in the DSL? I 
don't have a concrete proposal (a few high level ideas only) and if we 
don't do anything special for the DSL I am ok with moving forward with 
this KIP as-is, but we should be aware of potential limitations for DSL 
users. We can always do a follow up KIP to close gaps when we understand 
the impact better -- covering the DSL would also expand the scope of 
this KIP significantly...


About the metric: just to double check. Do we think it's worth to add a 
new metric? Or could we re-use the existing "dropped record metric"?




-Matthias


On 4/10/24 5:11 AM, Sebastien Viale wrote:

Hi,

You are right, it will simplify types.

We update the KIP

regards

Sébastien *VIALE***

*MICHELIN GROUP* - InfORMATION Technology
*Technical Expert Kafka*

  Carmes / Bâtiment A17 4e / 63100 Clermont-Ferrand


*De :* Bruno Cadonna 
*Envoyé :* mercredi 10 avril 2024 10:38
*À :* dev@kafka.apache.org 
*Objet :* [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception 
handler for exceptions occuring during processing
Warning External sender Do not click on any links or open any 
attachments unless you trust the sender and know the content is safe.


Hi Loïc, Damien, and Sébastien,

Great that we are converging!


3.
Damien and Loïc, I think in your examples the handler will receive
Record because an Record is passed to
the processor in the following code line:
https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152
 


I see that we do not need to pass into the the handler a Record just because we do that for the DeserializationExceptionHandler
and the ProductionExceptionHandler. When those two handlers are called,
the record is already serialized. This is not the case for the
ProcessingExceptionHandler. However, I would propose to use Record
for the record that is passed to the ProcessingExceptionHandler because
it makes the handler API more flexible.


Best,
Bruno

This email was screened for spam and malicious content but exercise 
caution anyway.





On 4/9/24 9:09 PM, Loic Greffier wrote:
 > Hi Bruno and Bill,
 >
 > To complete the Damien's purposes about the point 3.
 >
 > Processing errors are caught and handled by the 
ProcessingErrorHandler, at the precise moment when records are processed 
by processor nodes. The handling will be performed in the "process" 
method of the ProcessorNode, such as:

 >
 > public void process(final Record record) {
 > ...
 >
 > try {
 > ...
 > } catch (final ClassCastException e) {
 > ...
 > } catch (Exception e) {
 > ProcessingExceptionHandler.ProcessingHandlerResponse response = 
this.processingExceptionHandler

 > .handle(internalProcessorContext, (Record) record, e);
 >
 > if (response == 
ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) {
 > throw new StreamsException("Processing exception handler is set to 
fail upon" +

 > " a processing error. If you would rather have the streaming pipeline" +
 > " continue after a processing error, please set the " +
 > DEFAULT_PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.",
 > e);
 > }
 > }
 > }
 > As you can 

Re: [DISCUSS] KIP-1036: Extend RecordDeserializationException exception

2024-04-10 Thread Matthias J. Sax

Thanks for the KIP Fred.

Couple of nits: it's not clear from the "Public API" section what is new 
and what is existing API w/o going back to the code. For existing 
methods which are not changed, it's also best to actually omit them. -- 
It would also be best to only put the interface itself down, but not the 
implementation (ie, no private members and no method body).


Thus, it might be better to do something like this:

+

public class RecordDeserializationException extends SerializationException {

   // newly added
   public RecordDeserializationException(TopicPartition partition,
 ConsumerRecord 
record,

 String message,
 Throwable cause);

   public ConsumerRecord getConsumerRecord();
}

+

From the description it's not clear to me if you propose to change the 
existing constructor, or propose to add a new constructor. From a 
compatibility POV, we cannot really change the existing constructor (but 
we could deprecate it and remove in the future (and add a new one in 
parallel). But I also agree with Kirk that there could be cases for 
which we cannot pass in a `ConsumerRecord` and thus keeping the old 
constructor could make sense (and change the new getter to return an 
`Optinal`).


Another small thing: in Kafka, getter methods are not using a `get` 
prefix, and thus it should be `consumerRecord()` w/o the "get".




-Matthias


On 4/10/24 4:21 PM, Kirk True wrote:

Hi Fred,

Thanks for the KIP!

Questions/comments:

How do we handle the case where CompletedFetch.parseRecord isn’t able to 
construct a well-formed ConsumerRecord (i.e. the values it needs are 
missing/corrupted/etc.)?
Please change RecordDeserializationException’s getConsumerRecord() method to be 
named consumerRecord() to be consistent.
Should we change the return type of consumerRecord() to be 
Optional> in the cases where even a “raw” 
ConsumerRecord can’t be created?
To avoid the above, does it make sense to include a Record object instead of a 
ConsumerRecord? The former doesn’t include the leaderEpoch or TimestampType, 
but maybe that’s OK?

Thanks,
Kirk


On Apr 10, 2024, at 8:47 AM, Frédérik Rouleau  
wrote:

Hi everyone,

To make implementation of DLQ in consumer easier, I would like to add the
raw ConsumerRecord into the RecordDeserializationException.

Details are in KIP-1036

.

Thanks for your feedback.

Regards,
Fred





[jira] [Created] (KAFKA-16508) Infinte loop if output topic does not exisit

2024-04-10 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16508:
---

 Summary: Infinte loop if output topic does not exisit
 Key: KAFKA-16508
 URL: https://issues.apache.org/jira/browse/KAFKA-16508
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


Kafka Streams supports `ProductionExceptionHandler` to drop records on error 
when writing into an output topic.

However, if the output topic does not exist, the corresponding error cannot be 
skipped over because the handler is not called.

The issue is, that the producer internally retires to fetch the output topic 
metadata until it times out, an a `TimeoutException` (which is a 
`RetriableException`) is returned via the registered `Callback`. However, for 
`RetriableException` there is different code path and the 
`ProductionExceptionHandler` is not called.

In general, Kafka Streams correctly tries to handle as many errors a possible 
internally, and a `RetriableError` falls into this category (and thus there is 
no need to call the handler). However, for this particular case, just retrying 
does not solve the issue – it's unclear if throwing a retryable 
`TimeoutException` is actually the right thing to do for the Producer? Also not 
sure what the right way to address this ticket would be (currently, we cannot 
really detect this case, except if we would do some nasty error message String 
comparison what sounds hacky...)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   4   5   6   7   8   9   10   >