[jira] [Created] (KAFKA-16955) ConcurrentModification exception thrown by KafkaStream threadState access

2024-06-13 Thread Rohan Desai (Jira)
Rohan Desai created KAFKA-16955:
---

 Summary: ConcurrentModification exception thrown by KafkaStream 
threadState access
 Key: KAFKA-16955
 URL: https://issues.apache.org/jira/browse/KAFKA-16955
 Project: Kafka
  Issue Type: Bug
Reporter: Rohan Desai


We see occasional ConcurrentModificationExceptions thrown when accessing 
threadState:

 

 
{code:java}
155.745service_application1 info[ERROR] 2024-06-11 07:56:42.417 
[e2e-de9a85a4-b1d9-4807-bc34-e269f760bcd7-StreamThread-2] 
ResponsiveKafkaStreams - stream-client 
[e2e-de9a85a4-b1d9-4807-bc34-e269f760bcd7] Replacing thread in the streams 
uncaught exception handler155.745service_application1 
infoorg.apache.kafka.streams.errors.StreamsException: 
java.util.ConcurrentModificationException155.745service_application1 info   
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:729)
 ~[kafka-streams-3.7.0.jar:?]155.745service_application1 infoat 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645)
 [kafka-streams-3.7.0.jar:?]155.745service_application1 infoCaused by: 
java.util.ConcurrentModificationException155.745service_application1 info at 
java.util.HashMap$HashIterator.nextNode(HashMap.java:1605) 
~[?:?]155.745service_application1 infoat 
java.util.HashMap$ValueIterator.next(HashMap.java:1633) 
~[?:?]155.745service_application1 info   at 
org.apache.kafka.streams.KafkaStreams$StreamStateListener.maybeSetRunning(KafkaStreams.java:656)
 ~[kafka-streams-3.7.0.jar:?]155.745service_application1 infoat 
org.apache.kafka.streams.KafkaStreams$StreamStateListener.onChange(KafkaStreams.java:686)
 ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info   at 
org.apache.kafka.streams.processor.internals.StreamThread.setState(StreamThread.java:261)
 ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info   at 
org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:1120)
 ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:921)
 ~[kafka-streams-3.7.0.jar:?]155.745service_application1 infoat 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686)
 ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info... 1 more 
{code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16876) TaskManager.handleRevocation doesn't handle errors thrown from task.prepareCommit

2024-05-31 Thread Rohan Desai (Jira)
Rohan Desai created KAFKA-16876:
---

 Summary: TaskManager.handleRevocation doesn't handle errors thrown 
from task.prepareCommit
 Key: KAFKA-16876
 URL: https://issues.apache.org/jira/browse/KAFKA-16876
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.6.0
Reporter: Rohan Desai


`TaskManager.handleRevocation` does not handle exceptions thrown by 
`task.prepareCommit`. In the particular instance I observed, `pepareCommit` 
flushed caches which led to downstream `producer.send` calls that threw a 
`TaskMigratedException`. This means that the tasks that need to be revoked are 
not suspended by `handleRevocation`. `ConsumerCoordinator` stores the thrown 
exception and then moves on to the other task assignment callbacks. One of 
these - `StreamsPartitionAssigner.onCommit` tries to close the tasks and raises 
an `IllegalStateException`. Fortunately, it dirty-closes the tasks if close 
fails so we don't leak any tasks. I think there's maybe two bugs here:
 # `TaskManager.handleRevocation` should handle errors from `prepareCommit`. It 
should try not to leave any revoked tasks in an unsuspended state.
 # The `ConsumerCoordinator` just throws the first exception that it sees. But 
it seems bad to throw the `TaskMigratedException` and drop the 
`IllegalStateException` (though in this case I think its relatively benign). I 
think on `IllegalStateException` we really want the streams thread to exit. One 
idea here is to have `ConsumerCoordinator` throw an exception type that 
includes the other exceptions that it has seen in another field. But this 
breaks the contract for clients that catch specific exceptions. I'm not sure of 
a clean solution, but I think its at least worth recording that it would be 
preferable to have the caller of `poll` handle all the thrown exceptions rather 
than just the first one.

 

Here is the IllegalStateException stack trace I observed:
{code:java}
[       508.535] [service_application2] [inf] [ERROR] 2024-05-30 06:35:04.556 
[e2e-c0a9810b-8b09-46bd-a6d0-f2678ce0a1f3-StreamThread-1] TaskManager - 
stream-thread [e2e-c0a9810b-8b09-46bd-a6d0-f2678ce0a1f3-St
reamThread-1] Failed to close task 0_3 cleanly. Attempting to close remaining 
tasks before re-throwing:
[       508.535] [service_application2] [inf] java.lang.IllegalStateException: 
Illegal state RUNNING while closing active task 0_3
[       508.535] [service_application2] [inf] at 
org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:673)
 ~[kafka-streams-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.streams.processor.internals.StreamTask.closeClean(StreamTask.java:546)
 ~[kafka-streams-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.streams.processor.internals.TaskManager.closeTaskClean(TaskManager.java:1295)
 ~[kafka-streams-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:630)
 [kafka-streams-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350)
 [kafka-streams-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381)
 [kafka-streams-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315)
 [kafka-clients-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469)
 [kafka-clients-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478)
 [kafka-clients-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389)
 [kafka-clients-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:564)
 [kafka-clients-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1220)
 [kafka-clients-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179) 
[kafka-clients-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159) 
[kaf

Re: [VOTE] KIP-924: customizable task assignment for Streams

2024-05-07 Thread Rohan Desai
Thanks all! KIP-924 is accepted with 4 +1 (binding) votes from Sophia
Blee-Goldman, Matthias Sax, Bruno Cadonna, and Lucas Brutschy

On Tue, Apr 16, 2024 at 1:46 PM Rohan Desai  wrote:

>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams
>
> As this KIP has been open for a while, and gone through a couple rounds of
> review/revision, I'm calling a vote to get it approved.
>


Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-25 Thread Rohan Desai
117: as Sophie laid out, there are two cases here right:
1. cases that are considered invalid by the existing assignors but are
still valid assignments in the sense that they can be used to generate a
valid consumer group assignment (from the perspective of the consumer group
protocol). An assignment that excludes a task is one such example, and
Sophie pointed out a good use case for it. I also think it makes sense to
allow these. It's hard to predict how a user might want to use the custom
assignor, and its reasonable to expect them to use it with care and not
hand-hold them.
2. cases that are not valid because it is impossible to compute a valid
consumer group assignment from them. In this case it seems totally
reasonable to just throw a fatal exception that gets passed to the uncaught
exception handler. If this case happens then there is some bug in the
user's assignor and its totally reasonable to fail the application in that
case. We _could_ try to be more graceful and default to one of the existing
assignors. But it's usually better to fail hard and fast when there is some
illegal state detected imo.

On Fri, Apr 19, 2024 at 4:18 PM Rohan Desai  wrote:

> Bruno, I've incorporated your feedback into the KIP document.
>
> On Fri, Apr 19, 2024 at 3:55 PM Rohan Desai 
> wrote:
>
>> Thanks for the feedback Bruno! For the most part I think it makes sense,
>> but leaving a couple follow-up thoughts/questions:
>>
>> re 4: I think Sophie's point was slightly different - that we might want
>> to wrap the return type for `assign` in a class so that its easily
>> extensible. This makes sense to me. Whether we do that or not, we can have
>> the return type be a Set instead of a Map as well.
>>
>> re 6: Yes, it's a callback that's called with the final assignment. I
>> like your suggested name.
>>
>> On Fri, Apr 5, 2024 at 12:17 PM Rohan Desai 
>> wrote:
>>
>>> Thanks for the feedback Sophie!
>>>
>>> re1: Totally agree. The fact that it's related to the partition assignor
>>> is clear from just `task.assignor`. I'll update.
>>> re3: This is a good point, and something I would find useful personally.
>>> I think its worth adding an interface that lets the plugin observe the
>>> final assignment. I'll add that.
>>> re4: I like the new `NodeAssignment` type. I'll update the KIP with that.
>>>
>>> On Thu, Nov 9, 2023 at 11:18 PM Rohan Desai 
>>> wrote:
>>>
>>>> Thanks for the feedback so far! I think pretty much all of it is
>>>> reasonable. I'll reply to it inline:
>>>>
>>>> > 1. All the API logic is granular at the Task level, except the
>>>> previousOwnerForPartition func. I’m not clear what’s the motivation
>>>> behind it, does our controller also want to change how the
>>>> partitions->tasks mapping is formed?
>>>> You're right that this is out of place. I've removed this method as
>>>> it's not needed by the task assignor.
>>>>
>>>> > 2. Just on the API layering itself: it feels a bit weird to have the
>>>> three built-in functions (defaultStandbyTaskAssignment etc) sitting in
>>>> the ApplicationMetadata class. If we consider them as some default util
>>>> functions, how about introducing moving those into their own static util
>>>> methods to separate from the ApplicationMetadata “fact objects” ?
>>>> Agreed. Updated in the latest revision of the kip. These have been
>>>> moved to TaskAssignorUtils
>>>>
>>>> > 3. I personally prefer `NodeAssignment` to be a read-only object
>>>> containing the decisions made by the assignor, including the
>>>> requestFollowupRebalance flag. For manipulating the half-baked results
>>>> inside the assignor itself, maybe we can just be flexible to let users use
>>>> whatever struts / their own classes even, if they like. WDYT?
>>>> Agreed. Updated in the latest version of the kip.
>>>>
>>>> > 1. For the API, thoughts on changing the method signature to return a
>>>> (non-Optional) TaskAssignor? Then we can either have the default
>>>> implementation return new HighAvailabilityTaskAssignor or just have a
>>>> default implementation class that people can extend if they don't want to
>>>> implement every method.
>>>> Based on some other discussion, I actually decided to get rid of the
>>>> plugin interface, and instead use config to specify individual plugin
>>>> behaviour. So the method you'

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-19 Thread Rohan Desai
Bruno, I've incorporated your feedback into the KIP document.

On Fri, Apr 19, 2024 at 3:55 PM Rohan Desai  wrote:

> Thanks for the feedback Bruno! For the most part I think it makes sense,
> but leaving a couple follow-up thoughts/questions:
>
> re 4: I think Sophie's point was slightly different - that we might want
> to wrap the return type for `assign` in a class so that its easily
> extensible. This makes sense to me. Whether we do that or not, we can have
> the return type be a Set instead of a Map as well.
>
> re 6: Yes, it's a callback that's called with the final assignment. I like
> your suggested name.
>
> On Fri, Apr 5, 2024 at 12:17 PM Rohan Desai 
> wrote:
>
>> Thanks for the feedback Sophie!
>>
>> re1: Totally agree. The fact that it's related to the partition assignor
>> is clear from just `task.assignor`. I'll update.
>> re3: This is a good point, and something I would find useful personally.
>> I think its worth adding an interface that lets the plugin observe the
>> final assignment. I'll add that.
>> re4: I like the new `NodeAssignment` type. I'll update the KIP with that.
>>
>> On Thu, Nov 9, 2023 at 11:18 PM Rohan Desai 
>> wrote:
>>
>>> Thanks for the feedback so far! I think pretty much all of it is
>>> reasonable. I'll reply to it inline:
>>>
>>> > 1. All the API logic is granular at the Task level, except the
>>> previousOwnerForPartition func. I’m not clear what’s the motivation
>>> behind it, does our controller also want to change how the
>>> partitions->tasks mapping is formed?
>>> You're right that this is out of place. I've removed this method as it's
>>> not needed by the task assignor.
>>>
>>> > 2. Just on the API layering itself: it feels a bit weird to have the
>>> three built-in functions (defaultStandbyTaskAssignment etc) sitting in
>>> the ApplicationMetadata class. If we consider them as some default util
>>> functions, how about introducing moving those into their own static util
>>> methods to separate from the ApplicationMetadata “fact objects” ?
>>> Agreed. Updated in the latest revision of the kip. These have been moved
>>> to TaskAssignorUtils
>>>
>>> > 3. I personally prefer `NodeAssignment` to be a read-only object
>>> containing the decisions made by the assignor, including the
>>> requestFollowupRebalance flag. For manipulating the half-baked results
>>> inside the assignor itself, maybe we can just be flexible to let users use
>>> whatever struts / their own classes even, if they like. WDYT?
>>> Agreed. Updated in the latest version of the kip.
>>>
>>> > 1. For the API, thoughts on changing the method signature to return a
>>> (non-Optional) TaskAssignor? Then we can either have the default
>>> implementation return new HighAvailabilityTaskAssignor or just have a
>>> default implementation class that people can extend if they don't want to
>>> implement every method.
>>> Based on some other discussion, I actually decided to get rid of the
>>> plugin interface, and instead use config to specify individual plugin
>>> behaviour. So the method you're referring to is no longer part of the
>>> proposal.
>>>
>>> > 3. Speaking of ApplicationMetadata, the javadoc says it's read only
>>> but
>>> theres methods that return void on it? It's not totally clear to me how
>>> that interface is supposed to be used by the assignor. It'd be nice if we
>>> could flip that interface such that it becomes part of the output instead
>>> of an input to the plugin.
>>> I've moved those methods to a util class. They're really utility methods
>>> the assignor might want to call to do some default or optimized assignment
>>> for some cases like rack-awareness.
>>>
>>> > 4. We should consider wrapping UUID in a ProcessID class so that we
>>> control
>>> the interface (there are a few places where UUID is directly used).
>>> I like it. Updated the proposal.
>>>
>>> > 5. What does NodeState#newAssignmentForNode() do? I thought the point
>>> was
>>> for the plugin to make the assignment? Is that the result of the default
>>> logic?
>>> It doesn't need to be part of the interface. I've removed it.
>>>
>>> > re 2/6:
>>>
>>> I generally agree with these points, but I'd rather hash that out in a
>>> PR than in the KIP review, as it'll be clearer what gets used how. It seems
>>> to me (committers please correct me if I'm wrong) that as long as we're on
>>> the same page about what information the interfaces are returning, that's
>>> ok at this level of discussion.
>>>
>>> On Tue, Nov 7, 2023 at 12:03 PM Rohan Desai 
>>> wrote:
>>>
>>>> Hello All,
>>>>
>>>> I'd like to start a discussion on KIP-924 (
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams)
>>>> which proposes an interface to allow users to plug into the streams
>>>> partition assignor. The motivation section in the KIP goes into some more
>>>> detail on why we think this is a useful addition. Thanks in advance for
>>>> your feedback!
>>>>
>>>> Best Regards,
>>>>
>>>> Rohan
>>>>
>>>>


Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-19 Thread Rohan Desai
Thanks for the feedback Bruno! For the most part I think it makes sense,
but leaving a couple follow-up thoughts/questions:

re 4: I think Sophie's point was slightly different - that we might want to
wrap the return type for `assign` in a class so that its easily extensible.
This makes sense to me. Whether we do that or not, we can have the return
type be a Set instead of a Map as well.

re 6: Yes, it's a callback that's called with the final assignment. I like
your suggested name.

On Fri, Apr 5, 2024 at 12:17 PM Rohan Desai  wrote:

> Thanks for the feedback Sophie!
>
> re1: Totally agree. The fact that it's related to the partition assignor
> is clear from just `task.assignor`. I'll update.
> re3: This is a good point, and something I would find useful personally. I
> think its worth adding an interface that lets the plugin observe the final
> assignment. I'll add that.
> re4: I like the new `NodeAssignment` type. I'll update the KIP with that.
>
> On Thu, Nov 9, 2023 at 11:18 PM Rohan Desai 
> wrote:
>
>> Thanks for the feedback so far! I think pretty much all of it is
>> reasonable. I'll reply to it inline:
>>
>> > 1. All the API logic is granular at the Task level, except the
>> previousOwnerForPartition func. I’m not clear what’s the motivation
>> behind it, does our controller also want to change how the
>> partitions->tasks mapping is formed?
>> You're right that this is out of place. I've removed this method as it's
>> not needed by the task assignor.
>>
>> > 2. Just on the API layering itself: it feels a bit weird to have the
>> three built-in functions (defaultStandbyTaskAssignment etc) sitting in
>> the ApplicationMetadata class. If we consider them as some default util
>> functions, how about introducing moving those into their own static util
>> methods to separate from the ApplicationMetadata “fact objects” ?
>> Agreed. Updated in the latest revision of the kip. These have been moved
>> to TaskAssignorUtils
>>
>> > 3. I personally prefer `NodeAssignment` to be a read-only object
>> containing the decisions made by the assignor, including the
>> requestFollowupRebalance flag. For manipulating the half-baked results
>> inside the assignor itself, maybe we can just be flexible to let users use
>> whatever struts / their own classes even, if they like. WDYT?
>> Agreed. Updated in the latest version of the kip.
>>
>> > 1. For the API, thoughts on changing the method signature to return a
>> (non-Optional) TaskAssignor? Then we can either have the default
>> implementation return new HighAvailabilityTaskAssignor or just have a
>> default implementation class that people can extend if they don't want to
>> implement every method.
>> Based on some other discussion, I actually decided to get rid of the
>> plugin interface, and instead use config to specify individual plugin
>> behaviour. So the method you're referring to is no longer part of the
>> proposal.
>>
>> > 3. Speaking of ApplicationMetadata, the javadoc says it's read only but
>> theres methods that return void on it? It's not totally clear to me how
>> that interface is supposed to be used by the assignor. It'd be nice if we
>> could flip that interface such that it becomes part of the output instead
>> of an input to the plugin.
>> I've moved those methods to a util class. They're really utility methods
>> the assignor might want to call to do some default or optimized assignment
>> for some cases like rack-awareness.
>>
>> > 4. We should consider wrapping UUID in a ProcessID class so that we
>> control
>> the interface (there are a few places where UUID is directly used).
>> I like it. Updated the proposal.
>>
>> > 5. What does NodeState#newAssignmentForNode() do? I thought the point
>> was
>> for the plugin to make the assignment? Is that the result of the default
>> logic?
>> It doesn't need to be part of the interface. I've removed it.
>>
>> > re 2/6:
>>
>> I generally agree with these points, but I'd rather hash that out in a PR
>> than in the KIP review, as it'll be clearer what gets used how. It seems to
>> me (committers please correct me if I'm wrong) that as long as we're on the
>> same page about what information the interfaces are returning, that's ok at
>> this level of discussion.
>>
>> On Tue, Nov 7, 2023 at 12:03 PM Rohan Desai 
>> wrote:
>>
>>> Hello All,
>>>
>>> I'd like to start a discussion on KIP-924 (
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams)
>>> which proposes an interface to allow users to plug into the streams
>>> partition assignor. The motivation section in the KIP goes into some more
>>> detail on why we think this is a useful addition. Thanks in advance for
>>> your feedback!
>>>
>>> Best Regards,
>>>
>>> Rohan
>>>
>>>


[VOTE] KIP-924: customizable task assignment for Streams

2024-04-16 Thread Rohan Desai
https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams

As this KIP has been open for a while, and gone through a couple rounds of
review/revision, I'm calling a vote to get it approved.


Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-05 Thread Rohan Desai
Thanks for the feedback Sophie!

re1: Totally agree. The fact that it's related to the partition assignor is
clear from just `task.assignor`. I'll update.
re3: This is a good point, and something I would find useful personally. I
think its worth adding an interface that lets the plugin observe the final
assignment. I'll add that.
re4: I like the new `NodeAssignment` type. I'll update the KIP with that.

On Thu, Nov 9, 2023 at 11:18 PM Rohan Desai  wrote:

> Thanks for the feedback so far! I think pretty much all of it is
> reasonable. I'll reply to it inline:
>
> > 1. All the API logic is granular at the Task level, except the
> previousOwnerForPartition func. I’m not clear what’s the motivation behind
> it, does our controller also want to change how the partitions->tasks
> mapping is formed?
> You're right that this is out of place. I've removed this method as it's
> not needed by the task assignor.
>
> > 2. Just on the API layering itself: it feels a bit weird to have the
> three built-in functions (defaultStandbyTaskAssignment etc) sitting in the
> ApplicationMetadata class. If we consider them as some default util
> functions, how about introducing moving those into their own static util
> methods to separate from the ApplicationMetadata “fact objects” ?
> Agreed. Updated in the latest revision of the kip. These have been moved
> to TaskAssignorUtils
>
> > 3. I personally prefer `NodeAssignment` to be a read-only object
> containing the decisions made by the assignor, including the
> requestFollowupRebalance flag. For manipulating the half-baked results
> inside the assignor itself, maybe we can just be flexible to let users use
> whatever struts / their own classes even, if they like. WDYT?
> Agreed. Updated in the latest version of the kip.
>
> > 1. For the API, thoughts on changing the method signature to return a
> (non-Optional) TaskAssignor? Then we can either have the default
> implementation return new HighAvailabilityTaskAssignor or just have a
> default implementation class that people can extend if they don't want to
> implement every method.
> Based on some other discussion, I actually decided to get rid of the
> plugin interface, and instead use config to specify individual plugin
> behaviour. So the method you're referring to is no longer part of the
> proposal.
>
> > 3. Speaking of ApplicationMetadata, the javadoc says it's read only but
> theres methods that return void on it? It's not totally clear to me how
> that interface is supposed to be used by the assignor. It'd be nice if we
> could flip that interface such that it becomes part of the output instead
> of an input to the plugin.
> I've moved those methods to a util class. They're really utility methods
> the assignor might want to call to do some default or optimized assignment
> for some cases like rack-awareness.
>
> > 4. We should consider wrapping UUID in a ProcessID class so that we
> control
> the interface (there are a few places where UUID is directly used).
> I like it. Updated the proposal.
>
> > 5. What does NodeState#newAssignmentForNode() do? I thought the point
> was
> for the plugin to make the assignment? Is that the result of the default
> logic?
> It doesn't need to be part of the interface. I've removed it.
>
> > re 2/6:
>
> I generally agree with these points, but I'd rather hash that out in a PR
> than in the KIP review, as it'll be clearer what gets used how. It seems to
> me (committers please correct me if I'm wrong) that as long as we're on the
> same page about what information the interfaces are returning, that's ok at
> this level of discussion.
>
> On Tue, Nov 7, 2023 at 12:03 PM Rohan Desai 
> wrote:
>
>> Hello All,
>>
>> I'd like to start a discussion on KIP-924 (
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams)
>> which proposes an interface to allow users to plug into the streams
>> partition assignor. The motivation section in the KIP goes into some more
>> detail on why we think this is a useful addition. Thanks in advance for
>> your feedback!
>>
>> Best Regards,
>>
>> Rohan
>>
>>


Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2023-11-09 Thread Rohan Desai
Thanks for the feedback so far! I think pretty much all of it is
reasonable. I'll reply to it inline:

> 1. All the API logic is granular at the Task level, except the
previousOwnerForPartition func. I’m not clear what’s the motivation behind
it, does our controller also want to change how the partitions->tasks
mapping is formed?
You're right that this is out of place. I've removed this method as it's
not needed by the task assignor.

> 2. Just on the API layering itself: it feels a bit weird to have the
three built-in functions (defaultStandbyTaskAssignment etc) sitting in the
ApplicationMetadata class. If we consider them as some default util
functions, how about introducing moving those into their own static util
methods to separate from the ApplicationMetadata “fact objects” ?
Agreed. Updated in the latest revision of the kip. These have been moved to
TaskAssignorUtils

> 3. I personally prefer `NodeAssignment` to be a read-only object
containing the decisions made by the assignor, including the
requestFollowupRebalance flag. For manipulating the half-baked results
inside the assignor itself, maybe we can just be flexible to let users use
whatever struts / their own classes even, if they like. WDYT?
Agreed. Updated in the latest version of the kip.

> 1. For the API, thoughts on changing the method signature to return a
(non-Optional) TaskAssignor? Then we can either have the default
implementation return new HighAvailabilityTaskAssignor or just have a
default implementation class that people can extend if they don't want to
implement every method.
Based on some other discussion, I actually decided to get rid of the plugin
interface, and instead use config to specify individual plugin behaviour.
So the method you're referring to is no longer part of the proposal.

> 3. Speaking of ApplicationMetadata, the javadoc says it's read only but
theres methods that return void on it? It's not totally clear to me how
that interface is supposed to be used by the assignor. It'd be nice if we
could flip that interface such that it becomes part of the output instead
of an input to the plugin.
I've moved those methods to a util class. They're really utility methods
the assignor might want to call to do some default or optimized assignment
for some cases like rack-awareness.

> 4. We should consider wrapping UUID in a ProcessID class so that we
control
the interface (there are a few places where UUID is directly used).
I like it. Updated the proposal.

> 5. What does NodeState#newAssignmentForNode() do? I thought the point was
for the plugin to make the assignment? Is that the result of the default
logic?
It doesn't need to be part of the interface. I've removed it.

> re 2/6:

I generally agree with these points, but I'd rather hash that out in a PR
than in the KIP review, as it'll be clearer what gets used how. It seems to
me (committers please correct me if I'm wrong) that as long as we're on the
same page about what information the interfaces are returning, that's ok at
this level of discussion.

On Tue, Nov 7, 2023 at 12:03 PM Rohan Desai  wrote:

> Hello All,
>
> I'd like to start a discussion on KIP-924 (
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams)
> which proposes an interface to allow users to plug into the streams
> partition assignor. The motivation section in the KIP goes into some more
> detail on why we think this is a useful addition. Thanks in advance for
> your feedback!
>
> Best Regards,
>
> Rohan
>
>


[DISCUSS] KIP-924: customizable task assignment for Streams

2023-11-07 Thread Rohan Desai
Hello All,

I'd like to start a discussion on KIP-924 (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams)
which proposes an interface to allow users to plug into the streams
partition assignor. The motivation section in the KIP goes into some more
detail on why we think this is a useful addition. Thanks in advance for
your feedback!

Best Regards,

Rohan


Re: [VOTE] KIP-761: Add Total Blocked Time Metric to Streams

2022-03-22 Thread Rohan Desai
Hello,

I discovered a bug in the design of this metric. The bug is documented
here: https://github.com/apache/kafka/pull/11805. We need to include time
the producer spends waiting on topic metadata into the total blocked time.
But to do this we would need to add a new producer metric that tracks the
total time spent blocked on metadata. I've implemented this in a patch
here: https://github.com/apache/kafka/pull/11805

I'm hoping I can just update this KIP to include the new producer metric.


On Tue, Aug 31, 2021 at 1:07 AM Rohan Desai  wrote:

> FYI I've updated the metric names in the KIP to the form
> ".*-time-ns-total" and clarified that the times being measured are in
> nanoseconds.
>
> On Wed, Jul 21, 2021 at 5:09 PM Rohan Desai 
> wrote:
>
>> Now that the discussion thread's been open for a few days, I'm calling
>> for a vote on
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-761%3A+Add+Total+Blocked+Time+Metric+to+Streams
>>
>


Re: [DISCUSS] KIP-761: Add total blocked time metric to streams

2022-02-24 Thread Rohan Desai
Hello,

I discovered a bug in the design of this metric. The bug is documented
here: https://github.com/apache/kafka/pull/11805. We need to include time
the producer spends waiting on topic metadata into the total blocked time.
But to do this we would need to add a new producer metric that tracks the
total time spent blocked on metadata. I've implemented this in a patch
here: https://github.com/apache/kafka/pull/11805

I'm hoping I can just update this KIP to include the new producer metric.

On Mon, Jul 12, 2021 at 12:00 PM Rohan Desai 
wrote:

>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-761%3A+Add+Total+Blocked+Time+Metric+to+Streams
>


[jira] [Created] (KAFKA-13692) stream thread blocked-time-ns-total metric does not include producer metadata wait time

2022-02-24 Thread Rohan Desai (Jira)
Rohan Desai created KAFKA-13692:
---

 Summary: stream thread blocked-time-ns-total metric does not 
include producer metadata wait time
 Key: KAFKA-13692
 URL: https://issues.apache.org/jira/browse/KAFKA-13692
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.2.0
Reporter: Rohan Desai
 Fix For: 3.3.0


The stream thread blocked-time-ns-total metric does not include producer 
metadata wait time (time spent in `KafkaProducer.waitOnMetadata`). This can 
contribute significantly to actual total blocked time in some cases. For 
example, if a user deletes the streams sink topic, producers will wait until 
the max block timeout. This time does not get included in total blocked time 
when it should.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [VOTE] KIP-761: Add Total Blocked Time Metric to Streams

2021-08-31 Thread Rohan Desai
FYI I've updated the metric names in the KIP to the form ".*-time-ns-total"
and clarified that the times being measured are in nanoseconds.

On Wed, Jul 21, 2021 at 5:09 PM Rohan Desai  wrote:

> Now that the discussion thread's been open for a few days, I'm calling for
> a vote on
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-761%3A+Add+Total+Blocked+Time+Metric+to+Streams
>


[jira] [Created] (KAFKA-13229) KIP-761: implement a total blocked time metric in Kafka Streams

2021-08-25 Thread Rohan Desai (Jira)
Rohan Desai created KAFKA-13229:
---

 Summary: KIP-761: implement a total blocked time metric in Kafka 
Streams
 Key: KAFKA-13229
 URL: https://issues.apache.org/jira/browse/KAFKA-13229
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 3.1.0
Reporter: Rohan Desai
 Fix For: 3.1.0


KIP-761 proposes a total blocked time metric in streams that measures the total 
time (since the thread was started) that a given thread is blocked on Kafka.

KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-761%3A+Add+Total+Blocked+Time+Metric+to+Streams



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-761: Add total blocked time metric to streams

2021-07-22 Thread Rohan Desai
re sophie:

The intent here was to include all blocked time (not just `RUNNING`). The
caller can window the total blocked time themselves, and that can be
compared with a timeseries of the state to understand the ratio in
different states. I'll update the KIP to include `committed`. The admin API
calls should be accounted for by the admin client iotime/iowaittime
metrics.

On Tue, Jul 20, 2021 at 11:49 PM Rohan Desai 
wrote:

> > I remember now that we moved the round-trip PID's txn completion logic
> into
> init-transaction and commit/abort-transaction. So I think we'd count time
> as in StreamsProducer#initTransaction as well (admittedly it is in most
> cases a one-time thing).
>
> Makes sense - I'll update the KIP
>
> On Tue, Jul 20, 2021 at 11:48 PM Rohan Desai 
> wrote:
>
>>
>> > I had a question - it seems like from the descriptionsof
>> `txn-commit-time-total` and `offset-commit-time-total` that they measure
>> similar processes for ALOS and EOS, but only `txn-commit-time-total` is
>> included in `blocked-time-total`. Why isn't `offset-commit-time-total` also
>> included?
>>
>> I've updated the KIP to include it.
>>
>> > Aside from `flush-time-total`, `txn-commit-time-total` and
>> `offset-commit-time-total`, which will be producer/consumer client
>> metrics,
>> the rest of the metrics will be streams metrics that will be thread level,
>> is that right?
>>
>> Based on the feedback from Guozhang, I've updated the KIP to reflect that
>> the lower-level metrics are all client metrics that are then summed to
>> compute the blocked time metric, which is a Streams metric.
>>
>> On Tue, Jul 20, 2021 at 11:58 AM Rohan Desai 
>> wrote:
>>
>>> > Similarly, I think "txn-commit-time-total" and
>>> "offset-commit-time-total" may better be inside producer and consumer
>>> clients respectively.
>>>
>>> I agree for offset-commit-time-total. For txn-commit-time-total I'm
>>> proposing we measure `StreamsProducer.commitTransaction`, which wraps
>>> multiple producer calls (sendOffsets, commitTransaction)
>>>
>>> > > For "txn-commit-time-total" specifically, besides
>>> producer.commitTxn.
>>> other txn-related calls may also be blocking, including
>>> producer.beginTxn/abortTxn, I saw you mentioned "txn-begin-time-total"
>>> later in the doc, but did not include it as a separate metric, and
>>> similarly, should we have a `txn-abort-time-total` as well? If yes,
>>> could
>>> you update the KIP page accordingly.
>>>
>>> `beginTransaction` is not blocking - I meant to remove that from that
>>> doc. I'll add something for abort.
>>>
>>> On Mon, Jul 19, 2021 at 11:55 PM Rohan Desai 
>>> wrote:
>>>
>>>> Thanks for the review Guozhang! responding to your feedback inline:
>>>>
>>>> > 1) I agree that the current ratio metrics is just "snapshot in
>>>> point", and
>>>> more flexible metrics that would allow reporters to calculate based on
>>>> window intervals are better. However, the current mechanism of the
>>>> proposed
>>>> metrics assumes the thread->clients mapping as of today, where each
>>>> thread
>>>> would own exclusively one main consumer, restore consumer, producer and
>>>> an
>>>> admin client. But this mapping may be subject to change in the future.
>>>> Have
>>>> you thought about how this metric can be extended when, e.g. the
>>>> embedded
>>>> clients and stream threads are de-coupled?
>>>>
>>>> Of course this depends on how exactly we refactor the runtime -
>>>> assuming that we plan to factor out consumers into an "I/O" layer that is
>>>> responsible for receiving records and enqueuing them to be processed by
>>>> processing threads, then I think it should be reasonable to count the time
>>>> we spend blocked on this internal queue(s) as blocked. The main concern
>>>> there to me is that the I/O layer would be doing something expensive like
>>>> decompression that shouldn't be counted as "blocked". But if that really is
>>>> so expensive that it starts to throw off our ratios then it's probably
>>>> indicative of a larger problem that the "i/o layer" is a bottleneck and it
>>>> would be worth refactoring so that decompression (or insert other expensive
>>>

[VOTE] KIP-761: Add Total Blocked Time Metric to Streams

2021-07-21 Thread Rohan Desai
Now that the discussion thread's been open for a few days, I'm calling for
a vote on
https://cwiki.apache.org/confluence/display/KAFKA/KIP-761%3A+Add+Total+Blocked+Time+Metric+to+Streams


Re: [DISCUSS] KIP-761: Add total blocked time metric to streams

2021-07-20 Thread Rohan Desai
> I remember now that we moved the round-trip PID's txn completion logic
into
init-transaction and commit/abort-transaction. So I think we'd count time
as in StreamsProducer#initTransaction as well (admittedly it is in most
cases a one-time thing).

Makes sense - I'll update the KIP

On Tue, Jul 20, 2021 at 11:48 PM Rohan Desai 
wrote:

>
> > I had a question - it seems like from the descriptionsof
> `txn-commit-time-total` and `offset-commit-time-total` that they measure
> similar processes for ALOS and EOS, but only `txn-commit-time-total` is
> included in `blocked-time-total`. Why isn't `offset-commit-time-total` also
> included?
>
> I've updated the KIP to include it.
>
> > Aside from `flush-time-total`, `txn-commit-time-total` and
> `offset-commit-time-total`, which will be producer/consumer client metrics,
> the rest of the metrics will be streams metrics that will be thread level,
> is that right?
>
> Based on the feedback from Guozhang, I've updated the KIP to reflect that
> the lower-level metrics are all client metrics that are then summed to
> compute the blocked time metric, which is a Streams metric.
>
> On Tue, Jul 20, 2021 at 11:58 AM Rohan Desai 
> wrote:
>
>> > Similarly, I think "txn-commit-time-total" and
>> "offset-commit-time-total" may better be inside producer and consumer
>> clients respectively.
>>
>> I agree for offset-commit-time-total. For txn-commit-time-total I'm
>> proposing we measure `StreamsProducer.commitTransaction`, which wraps
>> multiple producer calls (sendOffsets, commitTransaction)
>>
>> > > For "txn-commit-time-total" specifically, besides producer.commitTxn.
>> other txn-related calls may also be blocking, including
>> producer.beginTxn/abortTxn, I saw you mentioned "txn-begin-time-total"
>> later in the doc, but did not include it as a separate metric, and
>> similarly, should we have a `txn-abort-time-total` as well? If yes, could
>> you update the KIP page accordingly.
>>
>> `beginTransaction` is not blocking - I meant to remove that from that
>> doc. I'll add something for abort.
>>
>> On Mon, Jul 19, 2021 at 11:55 PM Rohan Desai 
>> wrote:
>>
>>> Thanks for the review Guozhang! responding to your feedback inline:
>>>
>>> > 1) I agree that the current ratio metrics is just "snapshot in
>>> point", and
>>> more flexible metrics that would allow reporters to calculate based on
>>> window intervals are better. However, the current mechanism of the
>>> proposed
>>> metrics assumes the thread->clients mapping as of today, where each
>>> thread
>>> would own exclusively one main consumer, restore consumer, producer and
>>> an
>>> admin client. But this mapping may be subject to change in the future.
>>> Have
>>> you thought about how this metric can be extended when, e.g. the embedded
>>> clients and stream threads are de-coupled?
>>>
>>> Of course this depends on how exactly we refactor the runtime - assuming
>>> that we plan to factor out consumers into an "I/O" layer that is
>>> responsible for receiving records and enqueuing them to be processed by
>>> processing threads, then I think it should be reasonable to count the time
>>> we spend blocked on this internal queue(s) as blocked. The main concern
>>> there to me is that the I/O layer would be doing something expensive like
>>> decompression that shouldn't be counted as "blocked". But if that really is
>>> so expensive that it starts to throw off our ratios then it's probably
>>> indicative of a larger problem that the "i/o layer" is a bottleneck and it
>>> would be worth refactoring so that decompression (or insert other expensive
>>> thing here) can also be done on the processing threads.
>>>
>>> > 2) [This and all below are minor comments] The "flush-time-total" may
>>> better be a producer client metric, as "flush-wait-time-total", than a
>>> streams metric, though the streams-level "total-blocked" can still
>>> leverage
>>> it. Similarly, I think "txn-commit-time-total" and
>>> "offset-commit-time-total" may better be inside producer and consumer
>>> clients respectively.
>>>
>>> Good call - I'll update the KIP
>>>
>>> > 3) The doc was not very clear on how "thread-start-time" would be
>>> needed
>>> when calculating streams util

Re: [DISCUSS] KIP-761: Add total blocked time metric to streams

2021-07-20 Thread Rohan Desai
> I had a question - it seems like from the descriptionsof
`txn-commit-time-total` and `offset-commit-time-total` that they measure
similar processes for ALOS and EOS, but only `txn-commit-time-total` is
included in `blocked-time-total`. Why isn't `offset-commit-time-total` also
included?

I've updated the KIP to include it.

> Aside from `flush-time-total`, `txn-commit-time-total` and
`offset-commit-time-total`, which will be producer/consumer client metrics,
the rest of the metrics will be streams metrics that will be thread level,
is that right?

Based on the feedback from Guozhang, I've updated the KIP to reflect that
the lower-level metrics are all client metrics that are then summed to
compute the blocked time metric, which is a Streams metric.

On Tue, Jul 20, 2021 at 11:58 AM Rohan Desai 
wrote:

> > Similarly, I think "txn-commit-time-total" and
> "offset-commit-time-total" may better be inside producer and consumer
> clients respectively.
>
> I agree for offset-commit-time-total. For txn-commit-time-total I'm
> proposing we measure `StreamsProducer.commitTransaction`, which wraps
> multiple producer calls (sendOffsets, commitTransaction)
>
> > > For "txn-commit-time-total" specifically, besides producer.commitTxn.
> other txn-related calls may also be blocking, including
> producer.beginTxn/abortTxn, I saw you mentioned "txn-begin-time-total"
> later in the doc, but did not include it as a separate metric, and
> similarly, should we have a `txn-abort-time-total` as well? If yes, could
> you update the KIP page accordingly.
>
> `beginTransaction` is not blocking - I meant to remove that from that doc.
> I'll add something for abort.
>
> On Mon, Jul 19, 2021 at 11:55 PM Rohan Desai 
> wrote:
>
>> Thanks for the review Guozhang! responding to your feedback inline:
>>
>> > 1) I agree that the current ratio metrics is just "snapshot in point",
>> and
>> more flexible metrics that would allow reporters to calculate based on
>> window intervals are better. However, the current mechanism of the
>> proposed
>> metrics assumes the thread->clients mapping as of today, where each thread
>> would own exclusively one main consumer, restore consumer, producer and an
>> admin client. But this mapping may be subject to change in the future.
>> Have
>> you thought about how this metric can be extended when, e.g. the embedded
>> clients and stream threads are de-coupled?
>>
>> Of course this depends on how exactly we refactor the runtime - assuming
>> that we plan to factor out consumers into an "I/O" layer that is
>> responsible for receiving records and enqueuing them to be processed by
>> processing threads, then I think it should be reasonable to count the time
>> we spend blocked on this internal queue(s) as blocked. The main concern
>> there to me is that the I/O layer would be doing something expensive like
>> decompression that shouldn't be counted as "blocked". But if that really is
>> so expensive that it starts to throw off our ratios then it's probably
>> indicative of a larger problem that the "i/o layer" is a bottleneck and it
>> would be worth refactoring so that decompression (or insert other expensive
>> thing here) can also be done on the processing threads.
>>
>> > 2) [This and all below are minor comments] The "flush-time-total" may
>> better be a producer client metric, as "flush-wait-time-total", than a
>> streams metric, though the streams-level "total-blocked" can still
>> leverage
>> it. Similarly, I think "txn-commit-time-total" and
>> "offset-commit-time-total" may better be inside producer and consumer
>> clients respectively.
>>
>> Good call - I'll update the KIP
>>
>> > 3) The doc was not very clear on how "thread-start-time" would be
>> needed
>> when calculating streams utilization along with total-blocked time, could
>> you elaborate a bit more in the KIP?
>>
>> Yes, will do.
>>
>> > For "txn-commit-time-total" specifically, besides producer.commitTxn.
>> other txn-related calls may also be blocking, including
>> producer.beginTxn/abortTxn, I saw you mentioned "txn-begin-time-total"
>> later in the doc, but did not include it as a separate metric, and
>> similarly, should we have a `txn-abort-time-total` as well? If yes, could
>> you update the KIP page accordingly.
>>
>> Ack.
>>
>> On Mon, Jul 12, 2021 at 11:29 PM Rohan Desai 
>> wrote:
>>
>>> Hello All,
>>>
>>> I'd like to start a discussion on the KIP linked above which proposes
>>> some metrics that we would find useful to help measure whether a Kafka
>>> Streams application is saturated. The motivation section in the KIP goes
>>> into some more detail on why we think this is a useful addition to the
>>> metrics already implemented. Thanks in advance for your feedback!
>>>
>>> Best Regards,
>>>
>>> Rohan
>>>
>>> On Mon, Jul 12, 2021 at 12:00 PM Rohan Desai 
>>> wrote:
>>>
>>>>
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-761%3A+Add+Total+Blocked+Time+Metric+to+Streams
>>>>
>>>


Re: [DISCUSS] KIP-761: Add total blocked time metric to streams

2021-07-20 Thread Rohan Desai
> Similarly, I think "txn-commit-time-total" and
"offset-commit-time-total" may better be inside producer and consumer
clients respectively.

I agree for offset-commit-time-total. For txn-commit-time-total I'm
proposing we measure `StreamsProducer.commitTransaction`, which wraps
multiple producer calls (sendOffsets, commitTransaction)

> > For "txn-commit-time-total" specifically, besides producer.commitTxn.
other txn-related calls may also be blocking, including
producer.beginTxn/abortTxn, I saw you mentioned "txn-begin-time-total"
later in the doc, but did not include it as a separate metric, and
similarly, should we have a `txn-abort-time-total` as well? If yes, could
you update the KIP page accordingly.

`beginTransaction` is not blocking - I meant to remove that from that doc.
I'll add something for abort.

On Mon, Jul 19, 2021 at 11:55 PM Rohan Desai 
wrote:

> Thanks for the review Guozhang! responding to your feedback inline:
>
> > 1) I agree that the current ratio metrics is just "snapshot in point",
> and
> more flexible metrics that would allow reporters to calculate based on
> window intervals are better. However, the current mechanism of the proposed
> metrics assumes the thread->clients mapping as of today, where each thread
> would own exclusively one main consumer, restore consumer, producer and an
> admin client. But this mapping may be subject to change in the future. Have
> you thought about how this metric can be extended when, e.g. the embedded
> clients and stream threads are de-coupled?
>
> Of course this depends on how exactly we refactor the runtime - assuming
> that we plan to factor out consumers into an "I/O" layer that is
> responsible for receiving records and enqueuing them to be processed by
> processing threads, then I think it should be reasonable to count the time
> we spend blocked on this internal queue(s) as blocked. The main concern
> there to me is that the I/O layer would be doing something expensive like
> decompression that shouldn't be counted as "blocked". But if that really is
> so expensive that it starts to throw off our ratios then it's probably
> indicative of a larger problem that the "i/o layer" is a bottleneck and it
> would be worth refactoring so that decompression (or insert other expensive
> thing here) can also be done on the processing threads.
>
> > 2) [This and all below are minor comments] The "flush-time-total" may
> better be a producer client metric, as "flush-wait-time-total", than a
> streams metric, though the streams-level "total-blocked" can still leverage
> it. Similarly, I think "txn-commit-time-total" and
> "offset-commit-time-total" may better be inside producer and consumer
> clients respectively.
>
> Good call - I'll update the KIP
>
> > 3) The doc was not very clear on how "thread-start-time" would be needed
> when calculating streams utilization along with total-blocked time, could
> you elaborate a bit more in the KIP?
>
> Yes, will do.
>
> > For "txn-commit-time-total" specifically, besides producer.commitTxn.
> other txn-related calls may also be blocking, including
> producer.beginTxn/abortTxn, I saw you mentioned "txn-begin-time-total"
> later in the doc, but did not include it as a separate metric, and
> similarly, should we have a `txn-abort-time-total` as well? If yes, could
> you update the KIP page accordingly.
>
> Ack.
>
> On Mon, Jul 12, 2021 at 11:29 PM Rohan Desai 
> wrote:
>
>> Hello All,
>>
>> I'd like to start a discussion on the KIP linked above which proposes
>> some metrics that we would find useful to help measure whether a Kafka
>> Streams application is saturated. The motivation section in the KIP goes
>> into some more detail on why we think this is a useful addition to the
>> metrics already implemented. Thanks in advance for your feedback!
>>
>> Best Regards,
>>
>> Rohan
>>
>> On Mon, Jul 12, 2021 at 12:00 PM Rohan Desai 
>> wrote:
>>
>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-761%3A+Add+Total+Blocked+Time+Metric+to+Streams
>>>
>>


Re: [DISCUSS] KIP-761: Add total blocked time metric to streams

2021-07-19 Thread Rohan Desai
Thanks for the review Guozhang! responding to your feedback inline:

> 1) I agree that the current ratio metrics is just "snapshot in point", and
more flexible metrics that would allow reporters to calculate based on
window intervals are better. However, the current mechanism of the proposed
metrics assumes the thread->clients mapping as of today, where each thread
would own exclusively one main consumer, restore consumer, producer and an
admin client. But this mapping may be subject to change in the future. Have
you thought about how this metric can be extended when, e.g. the embedded
clients and stream threads are de-coupled?

Of course this depends on how exactly we refactor the runtime - assuming
that we plan to factor out consumers into an "I/O" layer that is
responsible for receiving records and enqueuing them to be processed by
processing threads, then I think it should be reasonable to count the time
we spend blocked on this internal queue(s) as blocked. The main concern
there to me is that the I/O layer would be doing something expensive like
decompression that shouldn't be counted as "blocked". But if that really is
so expensive that it starts to throw off our ratios then it's probably
indicative of a larger problem that the "i/o layer" is a bottleneck and it
would be worth refactoring so that decompression (or insert other expensive
thing here) can also be done on the processing threads.

> 2) [This and all below are minor comments] The "flush-time-total" may
better be a producer client metric, as "flush-wait-time-total", than a
streams metric, though the streams-level "total-blocked" can still leverage
it. Similarly, I think "txn-commit-time-total" and
"offset-commit-time-total" may better be inside producer and consumer
clients respectively.

Good call - I'll update the KIP

> 3) The doc was not very clear on how "thread-start-time" would be needed
when calculating streams utilization along with total-blocked time, could
you elaborate a bit more in the KIP?

Yes, will do.

> For "txn-commit-time-total" specifically, besides producer.commitTxn.
other txn-related calls may also be blocking, including
producer.beginTxn/abortTxn, I saw you mentioned "txn-begin-time-total"
later in the doc, but did not include it as a separate metric, and
similarly, should we have a `txn-abort-time-total` as well? If yes, could
you update the KIP page accordingly.

Ack.

On Mon, Jul 12, 2021 at 11:29 PM Rohan Desai 
wrote:

> Hello All,
>
> I'd like to start a discussion on the KIP linked above which proposes some
> metrics that we would find useful to help measure whether a Kafka Streams
> application is saturated. The motivation section in the KIP goes into some
> more detail on why we think this is a useful addition to the metrics
> already implemented. Thanks in advance for your feedback!
>
> Best Regards,
>
> Rohan
>
> On Mon, Jul 12, 2021 at 12:00 PM Rohan Desai 
> wrote:
>
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-761%3A+Add+Total+Blocked+Time+Metric+to+Streams
>>
>


Re: [DISCUSS] KIP-761: Add total blocked time metric to streams

2021-07-12 Thread Rohan Desai
Hello All,

I'd like to start a discussion on the KIP linked above which proposes some
metrics that we would find useful to help measure whether a Kafka Streams
application is saturated. The motivation section in the KIP goes into some
more detail on why we think this is a useful addition to the metrics
already implemented. Thanks in advance for your feedback!

Best Regards,

Rohan

On Mon, Jul 12, 2021 at 12:00 PM Rohan Desai 
wrote:

>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-761%3A+Add+Total+Blocked+Time+Metric+to+Streams
>


[DISCUSS] KIP-761: Add total blocked time metric to streams

2021-07-12 Thread Rohan Desai
https://cwiki.apache.org/confluence/display/KAFKA/KIP-761%3A+Add+Total+Blocked+Time+Metric+to+Streams


[jira] [Created] (KAFKA-12707) KafkaProducer should have a clearer error message on sasl.mechanism misconfiguration

2021-04-21 Thread Rohan Desai (Jira)
Rohan Desai created KAFKA-12707:
---

 Summary: KafkaProducer should have a clearer error message on 
sasl.mechanism misconfiguration
 Key: KAFKA-12707
 URL: https://issues.apache.org/jira/browse/KAFKA-12707
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.0.0
Reporter: Rohan Desai


Not sure if this is producer-specific, but I accidentally configured my 
producer by setting:

```

sasl.mechanism=plain

```

instead of 

```

sasl.mechanism=PLAIN

```

 

When I did this, the producer just hangs and logs in a loop like this, which 
isn't very informative:
[2021-04-21 21:33:20,519] WARN [Producer clientId=producer-1] Bootstrap broker 
pkc-g91q1.us-west-2.aws.stag.cpdev.cloud:9092 (id: -1 rack: null) disconnected 
(org.apache.kafka.clients.NetworkClient:1050)
[2021-04-21 21:33:21,584] INFO [Producer clientId=producer-1] Failed to create 
channel due to  (org.apache.kafka.common.network.SaslChannelBuilder:239)
org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure 
SaslClientAuthenticator
Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Failed 
to create SaslClient with mechanism plain
[2021-04-21 21:33:21,584] WARN [Producer clientId=producer-1] Error connecting 
to node pkc-g91q1.us-west-2.aws.stag.cpdev.cloud:9092 (id: -1 rack: null) 
(org.apache.kafka.clients.NetworkClient:982)
java.io.IOException: Channel could not be created for socket 
java.nio.channels.SocketChannel[closed]
at 
org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:348)
at 
org.apache.kafka.common.network.Selector.registerChannel(Selector.java:329)
at org.apache.kafka.common.network.Selector.connect(Selector.java:256)
at 
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:977)
at 
org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:73)
at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1148)
at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1036)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549)
at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:328)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: 
org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure 
SaslClientAuthenticator
at 
org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:240)
at 
org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:338)
... 10 more
Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Failed 
to configure SaslClientAuthenticator
Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Failed 
to create SaslClient with mechanism plain
[2021-04-21 21:33:21,584] WARN [Producer clientId=producer-1] Bootstrap broker 
pkc-g91q1.us-west-2.aws.stag.cpdev.cloud:9092 (id: -1 rack: null) disconnected 
(org.apache.kafka.clients.NetworkClient:1050)
 

It would be better to early-exit with a clear error message



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10585) Kafka Streams should clean up the state store directory from cleanup

2020-10-08 Thread Rohan Desai (Jira)
Rohan Desai created KAFKA-10585:
---

 Summary: Kafka Streams should clean up the state store directory 
from cleanup
 Key: KAFKA-10585
 URL: https://issues.apache.org/jira/browse/KAFKA-10585
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Rohan Desai


Currently, `KafkaStreams.cleanup` cleans up all the task-level directories and 
the global directory. However it doesn't clean up the enclosing state store 
directory, though streams does create this directory when it initializes the 
state for the streams app. Feels like it should remove this directory when it 
cleans up.

We notice this in ksql quite often, since every new query is a new streams app. 
Over time, we see lots of state store directories left around for old queries.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9659) Kafka Streams / Consumer fails on "fatal exception: group.instance.id gets fenced"

2020-03-04 Thread Rohan Desai (Jira)
Rohan Desai created KAFKA-9659:
--

 Summary: Kafka Streams / Consumer fails on "fatal exception: 
group.instance.id gets fenced"
 Key: KAFKA-9659
 URL: https://issues.apache.org/jira/browse/KAFKA-9659
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.5.0
Reporter: Rohan Desai


I'm running a KSQL query, which underneath is built into a Kafka Streams 
application. The application has been running without issue for a few days, 
until today, when all the streams threads exited with:

 

```

[ERROR] 2020-03-05 00:57:58,776 
[_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2]
 org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle - 
[Consumer instanceId=ksql-1-2, 
clientId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2-consumer,
 groupId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5] 
Received fatal exception: group.instance.id gets fenced
[ERROR] 2020-03-05 00:57:58,776 
[_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2]
 org.apache.kafka.clients.consumer.internals.AbstractCoordinator onFailure - 
[Consumer instanceId=ksql-1-2, 
clientId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2-consumer,
 groupId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5] 
Caught fenced group.instance.id Optional[ksql-1-2] error in heartbeat thread
[ERROR] 2020-03-05 00:57:58,776 
[_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2]
 org.apache.kafka.streams.processor.internals.StreamThread run - stream-thread 
[_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2]
 Encountered the following unexpected Kafka exception during processing, this 
usually indicate Streams internal errors:
org.apache.kafka.common.errors.FencedInstanceIdException: The broker rejected 
this static consumer since another consumer with the same group.instance.id has 
registered with a different member.id.
[INFO] 2020-03-05 00:57:58,776 
[_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2]
 org.apache.kafka.streams.processor.internals.StreamThread setState - 
stream-thread 
[_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2]
 State transition from RUNNING to PENDING_SHUTDOWN

```

 

This event coincided with a broker (broker 2) having some downtime (as measured 
by a healthchecking service which periodically pings it with a 
produce/consume). 

 

I've attached the KSQL and Kafka Streams logs to this ticket. Here's a summary 
for one of the streams threads (instance id `ksql-1-2`):

 

Around 00:56:36 the coordinator fails over from b11 to b2:

```

[INFO] 2020-03-05 00:56:36,258 
[_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2]
 org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle - 
[Consumer instanceId=ksql-1-2, 
clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2-consumer,
 groupId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0] Attempt to 
heartbeat failed since coordinator 
b11-pkc-lzxjz.us-west-2.aws.devel.cpdev.cloud:9092 (id: 2147483636 rack: null) 
is either not started or not valid.
[INFO] 2020-03-05 00:56:36,258 
[_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2]
 org.apache.kafka.clients.consumer.internals.AbstractCoordinator 
markCoordinatorUnknown - [Consumer instanceId=ksql-1-2, 
clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2-consumer,
 groupId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0] Group 
coordinator b11-pkc-lzxjz.us-west-2.aws.devel.cpdev.cloud:9092 (id: 2147483636 
rack: null) is unavailable or invalid, will attempt rediscovery
[INFO] 2020-03-05 00:56:36,270 
[_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2]
 org.apache.kafka.clients.consumer.internals.AbstractCoordinator onSuccess - 
[Consumer instanceId=ksql-1-2, 
clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2-consumer,
 groupId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0] Discovered 
group coordinator b2-pkc-lzxjz.us-west-2.aws.devel.cpdev.cloud:9092 (id: 
2147483645 rack: null)

```

 

A few seconds la

[VOTE] KIP-425: Add some Log4J Kafka Appender Properties for Producing to Secured Brokers

2019-02-04 Thread Rohan Desai
Calling a vote for this since it's pretty trivial/non-controversial

https://cwiki.apache.org/confluence/display/KAFKA/KIP-425%3A+Add+some+Log4J+Kafka+Appender+Properties+for+Producing+to+Secured+Brokers


Re: KIP-425

2019-02-02 Thread Rohan Desai
Please ignore. Starting a new one with the right subject.

On Sat, Feb 2, 2019 at 4:23 PM Rohan Desai  wrote:

> Starting a mail thread to discuss KIP-425: Add some Log4J Kafka Appender
> Properties for Producing to Secured Brokers
>


KIP-425

2019-02-02 Thread Rohan Desai
Starting a mail thread to discuss KIP-425: Add some Log4J Kafka Appender
Properties for Producing to Secured Brokers


[DISCUSS] KIP-425: Add some Log4J Kafka Appender Properties for Producing to Secured Brokers

2019-02-02 Thread Rohan Desai
https://cwiki.apache.org/confluence/display/KAFKA/KIP-425%3A+Add+some+Log4J+Kafka+Appender+Properties+for+Producing+to+Secured+Brokers


[jira] [Created] (KAFKA-7896) Add some Log4J Kafka Properties for Producing to Secured Brokers

2019-02-02 Thread Rohan Desai (JIRA)
Rohan Desai created KAFKA-7896:
--

 Summary: Add some Log4J Kafka Properties for Producing to Secured 
Brokers
 Key: KAFKA-7896
 URL: https://issues.apache.org/jira/browse/KAFKA-7896
 Project: Kafka
  Issue Type: Bug
Reporter: Rohan Desai


The existing Log4J Kafka appender supports producing to brokers that use the 
GSSAPI (kerberos) sasl mechanism, and only support configuring jaas via a jaas 
config file. Filing this issue to cover extending this to include the PLAIN 
mechanism and to support configuring jaas via an in-line configuration.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Request for Confluence access

2019-02-01 Thread Rohan Desai
Hello,

I want to write a KIP and need access to do so in Confluence.


[jira] [Created] (KAFKA-7311) Sender should reset next batch expiry time between poll loops

2018-08-18 Thread Rohan Desai (JIRA)
Rohan Desai created KAFKA-7311:
--

 Summary: Sender should reset next batch expiry time between poll 
loops
 Key: KAFKA-7311
 URL: https://issues.apache.org/jira/browse/KAFKA-7311
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.1.0
Reporter: Rohan Desai
 Fix For: 2.1.0


Sender does not reset next batch expiry time between poll loops. This means 
that once it crosses the expiry time of the first batch, it starts spinning on 
epoll with a timeout of 0, which consumes a lot of CPU. We observed this 
running KSQL when investigating why throughput would drop after about 10 
minutes (the default delivery timeout).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7064) "Unexpected resource type GROUP" when describing broker configs using latest admin client

2018-06-15 Thread Rohan Desai (JIRA)
Rohan Desai created KAFKA-7064:
--

 Summary: "Unexpected resource type GROUP" when describing broker 
configs using latest admin client
 Key: KAFKA-7064
 URL: https://issues.apache.org/jira/browse/KAFKA-7064
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.0.0
Reporter: Rohan Desai


I'm getting the following error when I try to describe broker configs using the 
admin client:

 

```org.apache.kafka.common.errors.InvalidRequestException: Unexpected resource 
type GROUP for resource 0```

 

I think its due to this commit: 
https://github.com/apache/kafka/commit/49db5a63c043b50c10c2dfd0648f8d74ee917b6a

 

My guess at what's going on is that now that the client is using ConfigResource 
instead of Resource it's sending a describe request for resource type BROKER w/ 
id 3, while the broker associates id 3 w/ GROUP



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-6383) StreamThread.shutdown doesn't clean up completely when called before StreamThread.start

2018-01-02 Thread Rohan Desai (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rohan Desai reopened KAFKA-6383:


I missed a race condition in my fix. If we start and then shutdown a streams 
thread without the streams thread running in between, then shutdown() throws an 
IllegalThreadStateException. This happens because shutdown() uses 
StreamThread.state to decide whether to call start(), and the state is 
transitioned from run which may not have executed yet.

> StreamThread.shutdown doesn't clean up completely when called before 
> StreamThread.start
> ---
>
> Key: KAFKA-6383
> URL: https://issues.apache.org/jira/browse/KAFKA-6383
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Rohan Desai
>Assignee: Rohan Desai
> Fix For: 1.1.0
>
>
> The following code leaks a producer network thread:
> {code}
> ks = new KafkaStreams(...);
> ks.close();
> {code}
> The underlying issue is that KafkaStreams creates a bunch of StreamsThreads 
> via StreamThread.create, which in turn creates a bunch of stuff (including a 
> producer). These resources are cleaned up only when the thread exits. So if 
> the thread was never started, then they are never cleaned up. 
> StreamThread.shutdown should clean up if it sees that the thread has never 
> been started.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6383) StreamThread.shutdown doesn't clean up completely when called beforeStreamThread.start

2017-12-18 Thread Rohan Desai (JIRA)
Rohan Desai created KAFKA-6383:
--

 Summary: StreamThread.shutdown doesn't clean up completely when 
called beforeStreamThread.start
 Key: KAFKA-6383
 URL: https://issues.apache.org/jira/browse/KAFKA-6383
 Project: Kafka
  Issue Type: Bug
Reporter: Rohan Desai


The following code leaks a producer network thread:
{code}
ks = new KafkaStreams(...);
ks.close();
{code}

The underlying issue is that KafkaStreams creates a bunch of StreamsThreads via 
StreamThread.create, which in turn creates a bunch of stuff (including a 
producer). These resources are cleaned up only when the thread exits. So if the 
thread was never started, then they are never cleaned up. StreamThread.shutdown 
should clean up if it sees that the thread has never been started.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)