[jira] [Created] (KAFKA-16955) ConcurrentModification exception thrown by KafkaStream threadState access
Rohan Desai created KAFKA-16955: --- Summary: ConcurrentModification exception thrown by KafkaStream threadState access Key: KAFKA-16955 URL: https://issues.apache.org/jira/browse/KAFKA-16955 Project: Kafka Issue Type: Bug Reporter: Rohan Desai We see occasional ConcurrentModificationExceptions thrown when accessing threadState: {code:java} 155.745service_application1 info[ERROR] 2024-06-11 07:56:42.417 [e2e-de9a85a4-b1d9-4807-bc34-e269f760bcd7-StreamThread-2] ResponsiveKafkaStreams - stream-client [e2e-de9a85a4-b1d9-4807-bc34-e269f760bcd7] Replacing thread in the streams uncaught exception handler155.745service_application1 infoorg.apache.kafka.streams.errors.StreamsException: java.util.ConcurrentModificationException155.745service_application1 info at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:729) ~[kafka-streams-3.7.0.jar:?]155.745service_application1 infoat org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645) [kafka-streams-3.7.0.jar:?]155.745service_application1 infoCaused by: java.util.ConcurrentModificationException155.745service_application1 info at java.util.HashMap$HashIterator.nextNode(HashMap.java:1605) ~[?:?]155.745service_application1 infoat java.util.HashMap$ValueIterator.next(HashMap.java:1633) ~[?:?]155.745service_application1 info at org.apache.kafka.streams.KafkaStreams$StreamStateListener.maybeSetRunning(KafkaStreams.java:656) ~[kafka-streams-3.7.0.jar:?]155.745service_application1 infoat org.apache.kafka.streams.KafkaStreams$StreamStateListener.onChange(KafkaStreams.java:686) ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info at org.apache.kafka.streams.processor.internals.StreamThread.setState(StreamThread.java:261) ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:1120) ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info at org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:921) ~[kafka-streams-3.7.0.jar:?]155.745service_application1 infoat org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686) ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info... 1 more {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16876) TaskManager.handleRevocation doesn't handle errors thrown from task.prepareCommit
Rohan Desai created KAFKA-16876: --- Summary: TaskManager.handleRevocation doesn't handle errors thrown from task.prepareCommit Key: KAFKA-16876 URL: https://issues.apache.org/jira/browse/KAFKA-16876 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.6.0 Reporter: Rohan Desai `TaskManager.handleRevocation` does not handle exceptions thrown by `task.prepareCommit`. In the particular instance I observed, `pepareCommit` flushed caches which led to downstream `producer.send` calls that threw a `TaskMigratedException`. This means that the tasks that need to be revoked are not suspended by `handleRevocation`. `ConsumerCoordinator` stores the thrown exception and then moves on to the other task assignment callbacks. One of these - `StreamsPartitionAssigner.onCommit` tries to close the tasks and raises an `IllegalStateException`. Fortunately, it dirty-closes the tasks if close fails so we don't leak any tasks. I think there's maybe two bugs here: # `TaskManager.handleRevocation` should handle errors from `prepareCommit`. It should try not to leave any revoked tasks in an unsuspended state. # The `ConsumerCoordinator` just throws the first exception that it sees. But it seems bad to throw the `TaskMigratedException` and drop the `IllegalStateException` (though in this case I think its relatively benign). I think on `IllegalStateException` we really want the streams thread to exit. One idea here is to have `ConsumerCoordinator` throw an exception type that includes the other exceptions that it has seen in another field. But this breaks the contract for clients that catch specific exceptions. I'm not sure of a clean solution, but I think its at least worth recording that it would be preferable to have the caller of `poll` handle all the thrown exceptions rather than just the first one. Here is the IllegalStateException stack trace I observed: {code:java} [ 508.535] [service_application2] [inf] [ERROR] 2024-05-30 06:35:04.556 [e2e-c0a9810b-8b09-46bd-a6d0-f2678ce0a1f3-StreamThread-1] TaskManager - stream-thread [e2e-c0a9810b-8b09-46bd-a6d0-f2678ce0a1f3-St reamThread-1] Failed to close task 0_3 cleanly. Attempting to close remaining tasks before re-throwing: [ 508.535] [service_application2] [inf] java.lang.IllegalStateException: Illegal state RUNNING while closing active task 0_3 [ 508.535] [service_application2] [inf] at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:673) ~[kafka-streams-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.streams.processor.internals.StreamTask.closeClean(StreamTask.java:546) ~[kafka-streams-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.streams.processor.internals.TaskManager.closeTaskClean(TaskManager.java:1295) ~[kafka-streams-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:630) [kafka-streams-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350) [kafka-streams-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381) [kafka-streams-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315) [kafka-clients-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469) [kafka-clients-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478) [kafka-clients-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389) [kafka-clients-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:564) [kafka-clients-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1220) [kafka-clients-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179) [kafka-clients-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159) [kaf
Re: [VOTE] KIP-924: customizable task assignment for Streams
Thanks all! KIP-924 is accepted with 4 +1 (binding) votes from Sophia Blee-Goldman, Matthias Sax, Bruno Cadonna, and Lucas Brutschy On Tue, Apr 16, 2024 at 1:46 PM Rohan Desai wrote: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams > > As this KIP has been open for a while, and gone through a couple rounds of > review/revision, I'm calling a vote to get it approved. >
Re: [DISCUSS] KIP-924: customizable task assignment for Streams
117: as Sophie laid out, there are two cases here right: 1. cases that are considered invalid by the existing assignors but are still valid assignments in the sense that they can be used to generate a valid consumer group assignment (from the perspective of the consumer group protocol). An assignment that excludes a task is one such example, and Sophie pointed out a good use case for it. I also think it makes sense to allow these. It's hard to predict how a user might want to use the custom assignor, and its reasonable to expect them to use it with care and not hand-hold them. 2. cases that are not valid because it is impossible to compute a valid consumer group assignment from them. In this case it seems totally reasonable to just throw a fatal exception that gets passed to the uncaught exception handler. If this case happens then there is some bug in the user's assignor and its totally reasonable to fail the application in that case. We _could_ try to be more graceful and default to one of the existing assignors. But it's usually better to fail hard and fast when there is some illegal state detected imo. On Fri, Apr 19, 2024 at 4:18 PM Rohan Desai wrote: > Bruno, I've incorporated your feedback into the KIP document. > > On Fri, Apr 19, 2024 at 3:55 PM Rohan Desai > wrote: > >> Thanks for the feedback Bruno! For the most part I think it makes sense, >> but leaving a couple follow-up thoughts/questions: >> >> re 4: I think Sophie's point was slightly different - that we might want >> to wrap the return type for `assign` in a class so that its easily >> extensible. This makes sense to me. Whether we do that or not, we can have >> the return type be a Set instead of a Map as well. >> >> re 6: Yes, it's a callback that's called with the final assignment. I >> like your suggested name. >> >> On Fri, Apr 5, 2024 at 12:17 PM Rohan Desai >> wrote: >> >>> Thanks for the feedback Sophie! >>> >>> re1: Totally agree. The fact that it's related to the partition assignor >>> is clear from just `task.assignor`. I'll update. >>> re3: This is a good point, and something I would find useful personally. >>> I think its worth adding an interface that lets the plugin observe the >>> final assignment. I'll add that. >>> re4: I like the new `NodeAssignment` type. I'll update the KIP with that. >>> >>> On Thu, Nov 9, 2023 at 11:18 PM Rohan Desai >>> wrote: >>> >>>> Thanks for the feedback so far! I think pretty much all of it is >>>> reasonable. I'll reply to it inline: >>>> >>>> > 1. All the API logic is granular at the Task level, except the >>>> previousOwnerForPartition func. I’m not clear what’s the motivation >>>> behind it, does our controller also want to change how the >>>> partitions->tasks mapping is formed? >>>> You're right that this is out of place. I've removed this method as >>>> it's not needed by the task assignor. >>>> >>>> > 2. Just on the API layering itself: it feels a bit weird to have the >>>> three built-in functions (defaultStandbyTaskAssignment etc) sitting in >>>> the ApplicationMetadata class. If we consider them as some default util >>>> functions, how about introducing moving those into their own static util >>>> methods to separate from the ApplicationMetadata “fact objects” ? >>>> Agreed. Updated in the latest revision of the kip. These have been >>>> moved to TaskAssignorUtils >>>> >>>> > 3. I personally prefer `NodeAssignment` to be a read-only object >>>> containing the decisions made by the assignor, including the >>>> requestFollowupRebalance flag. For manipulating the half-baked results >>>> inside the assignor itself, maybe we can just be flexible to let users use >>>> whatever struts / their own classes even, if they like. WDYT? >>>> Agreed. Updated in the latest version of the kip. >>>> >>>> > 1. For the API, thoughts on changing the method signature to return a >>>> (non-Optional) TaskAssignor? Then we can either have the default >>>> implementation return new HighAvailabilityTaskAssignor or just have a >>>> default implementation class that people can extend if they don't want to >>>> implement every method. >>>> Based on some other discussion, I actually decided to get rid of the >>>> plugin interface, and instead use config to specify individual plugin >>>> behaviour. So the method you'
Re: [DISCUSS] KIP-924: customizable task assignment for Streams
Bruno, I've incorporated your feedback into the KIP document. On Fri, Apr 19, 2024 at 3:55 PM Rohan Desai wrote: > Thanks for the feedback Bruno! For the most part I think it makes sense, > but leaving a couple follow-up thoughts/questions: > > re 4: I think Sophie's point was slightly different - that we might want > to wrap the return type for `assign` in a class so that its easily > extensible. This makes sense to me. Whether we do that or not, we can have > the return type be a Set instead of a Map as well. > > re 6: Yes, it's a callback that's called with the final assignment. I like > your suggested name. > > On Fri, Apr 5, 2024 at 12:17 PM Rohan Desai > wrote: > >> Thanks for the feedback Sophie! >> >> re1: Totally agree. The fact that it's related to the partition assignor >> is clear from just `task.assignor`. I'll update. >> re3: This is a good point, and something I would find useful personally. >> I think its worth adding an interface that lets the plugin observe the >> final assignment. I'll add that. >> re4: I like the new `NodeAssignment` type. I'll update the KIP with that. >> >> On Thu, Nov 9, 2023 at 11:18 PM Rohan Desai >> wrote: >> >>> Thanks for the feedback so far! I think pretty much all of it is >>> reasonable. I'll reply to it inline: >>> >>> > 1. All the API logic is granular at the Task level, except the >>> previousOwnerForPartition func. I’m not clear what’s the motivation >>> behind it, does our controller also want to change how the >>> partitions->tasks mapping is formed? >>> You're right that this is out of place. I've removed this method as it's >>> not needed by the task assignor. >>> >>> > 2. Just on the API layering itself: it feels a bit weird to have the >>> three built-in functions (defaultStandbyTaskAssignment etc) sitting in >>> the ApplicationMetadata class. If we consider them as some default util >>> functions, how about introducing moving those into their own static util >>> methods to separate from the ApplicationMetadata “fact objects” ? >>> Agreed. Updated in the latest revision of the kip. These have been moved >>> to TaskAssignorUtils >>> >>> > 3. I personally prefer `NodeAssignment` to be a read-only object >>> containing the decisions made by the assignor, including the >>> requestFollowupRebalance flag. For manipulating the half-baked results >>> inside the assignor itself, maybe we can just be flexible to let users use >>> whatever struts / their own classes even, if they like. WDYT? >>> Agreed. Updated in the latest version of the kip. >>> >>> > 1. For the API, thoughts on changing the method signature to return a >>> (non-Optional) TaskAssignor? Then we can either have the default >>> implementation return new HighAvailabilityTaskAssignor or just have a >>> default implementation class that people can extend if they don't want to >>> implement every method. >>> Based on some other discussion, I actually decided to get rid of the >>> plugin interface, and instead use config to specify individual plugin >>> behaviour. So the method you're referring to is no longer part of the >>> proposal. >>> >>> > 3. Speaking of ApplicationMetadata, the javadoc says it's read only >>> but >>> theres methods that return void on it? It's not totally clear to me how >>> that interface is supposed to be used by the assignor. It'd be nice if we >>> could flip that interface such that it becomes part of the output instead >>> of an input to the plugin. >>> I've moved those methods to a util class. They're really utility methods >>> the assignor might want to call to do some default or optimized assignment >>> for some cases like rack-awareness. >>> >>> > 4. We should consider wrapping UUID in a ProcessID class so that we >>> control >>> the interface (there are a few places where UUID is directly used). >>> I like it. Updated the proposal. >>> >>> > 5. What does NodeState#newAssignmentForNode() do? I thought the point >>> was >>> for the plugin to make the assignment? Is that the result of the default >>> logic? >>> It doesn't need to be part of the interface. I've removed it. >>> >>> > re 2/6: >>> >>> I generally agree with these points, but I'd rather hash that out in a >>> PR than in the KIP review, as it'll be clearer what gets used how. It seems >>> to me (committers please correct me if I'm wrong) that as long as we're on >>> the same page about what information the interfaces are returning, that's >>> ok at this level of discussion. >>> >>> On Tue, Nov 7, 2023 at 12:03 PM Rohan Desai >>> wrote: >>> >>>> Hello All, >>>> >>>> I'd like to start a discussion on KIP-924 ( >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams) >>>> which proposes an interface to allow users to plug into the streams >>>> partition assignor. The motivation section in the KIP goes into some more >>>> detail on why we think this is a useful addition. Thanks in advance for >>>> your feedback! >>>> >>>> Best Regards, >>>> >>>> Rohan >>>> >>>>
Re: [DISCUSS] KIP-924: customizable task assignment for Streams
Thanks for the feedback Bruno! For the most part I think it makes sense, but leaving a couple follow-up thoughts/questions: re 4: I think Sophie's point was slightly different - that we might want to wrap the return type for `assign` in a class so that its easily extensible. This makes sense to me. Whether we do that or not, we can have the return type be a Set instead of a Map as well. re 6: Yes, it's a callback that's called with the final assignment. I like your suggested name. On Fri, Apr 5, 2024 at 12:17 PM Rohan Desai wrote: > Thanks for the feedback Sophie! > > re1: Totally agree. The fact that it's related to the partition assignor > is clear from just `task.assignor`. I'll update. > re3: This is a good point, and something I would find useful personally. I > think its worth adding an interface that lets the plugin observe the final > assignment. I'll add that. > re4: I like the new `NodeAssignment` type. I'll update the KIP with that. > > On Thu, Nov 9, 2023 at 11:18 PM Rohan Desai > wrote: > >> Thanks for the feedback so far! I think pretty much all of it is >> reasonable. I'll reply to it inline: >> >> > 1. All the API logic is granular at the Task level, except the >> previousOwnerForPartition func. I’m not clear what’s the motivation >> behind it, does our controller also want to change how the >> partitions->tasks mapping is formed? >> You're right that this is out of place. I've removed this method as it's >> not needed by the task assignor. >> >> > 2. Just on the API layering itself: it feels a bit weird to have the >> three built-in functions (defaultStandbyTaskAssignment etc) sitting in >> the ApplicationMetadata class. If we consider them as some default util >> functions, how about introducing moving those into their own static util >> methods to separate from the ApplicationMetadata “fact objects” ? >> Agreed. Updated in the latest revision of the kip. These have been moved >> to TaskAssignorUtils >> >> > 3. I personally prefer `NodeAssignment` to be a read-only object >> containing the decisions made by the assignor, including the >> requestFollowupRebalance flag. For manipulating the half-baked results >> inside the assignor itself, maybe we can just be flexible to let users use >> whatever struts / their own classes even, if they like. WDYT? >> Agreed. Updated in the latest version of the kip. >> >> > 1. For the API, thoughts on changing the method signature to return a >> (non-Optional) TaskAssignor? Then we can either have the default >> implementation return new HighAvailabilityTaskAssignor or just have a >> default implementation class that people can extend if they don't want to >> implement every method. >> Based on some other discussion, I actually decided to get rid of the >> plugin interface, and instead use config to specify individual plugin >> behaviour. So the method you're referring to is no longer part of the >> proposal. >> >> > 3. Speaking of ApplicationMetadata, the javadoc says it's read only but >> theres methods that return void on it? It's not totally clear to me how >> that interface is supposed to be used by the assignor. It'd be nice if we >> could flip that interface such that it becomes part of the output instead >> of an input to the plugin. >> I've moved those methods to a util class. They're really utility methods >> the assignor might want to call to do some default or optimized assignment >> for some cases like rack-awareness. >> >> > 4. We should consider wrapping UUID in a ProcessID class so that we >> control >> the interface (there are a few places where UUID is directly used). >> I like it. Updated the proposal. >> >> > 5. What does NodeState#newAssignmentForNode() do? I thought the point >> was >> for the plugin to make the assignment? Is that the result of the default >> logic? >> It doesn't need to be part of the interface. I've removed it. >> >> > re 2/6: >> >> I generally agree with these points, but I'd rather hash that out in a PR >> than in the KIP review, as it'll be clearer what gets used how. It seems to >> me (committers please correct me if I'm wrong) that as long as we're on the >> same page about what information the interfaces are returning, that's ok at >> this level of discussion. >> >> On Tue, Nov 7, 2023 at 12:03 PM Rohan Desai >> wrote: >> >>> Hello All, >>> >>> I'd like to start a discussion on KIP-924 ( >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams) >>> which proposes an interface to allow users to plug into the streams >>> partition assignor. The motivation section in the KIP goes into some more >>> detail on why we think this is a useful addition. Thanks in advance for >>> your feedback! >>> >>> Best Regards, >>> >>> Rohan >>> >>>
[VOTE] KIP-924: customizable task assignment for Streams
https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams As this KIP has been open for a while, and gone through a couple rounds of review/revision, I'm calling a vote to get it approved.
Re: [DISCUSS] KIP-924: customizable task assignment for Streams
Thanks for the feedback Sophie! re1: Totally agree. The fact that it's related to the partition assignor is clear from just `task.assignor`. I'll update. re3: This is a good point, and something I would find useful personally. I think its worth adding an interface that lets the plugin observe the final assignment. I'll add that. re4: I like the new `NodeAssignment` type. I'll update the KIP with that. On Thu, Nov 9, 2023 at 11:18 PM Rohan Desai wrote: > Thanks for the feedback so far! I think pretty much all of it is > reasonable. I'll reply to it inline: > > > 1. All the API logic is granular at the Task level, except the > previousOwnerForPartition func. I’m not clear what’s the motivation behind > it, does our controller also want to change how the partitions->tasks > mapping is formed? > You're right that this is out of place. I've removed this method as it's > not needed by the task assignor. > > > 2. Just on the API layering itself: it feels a bit weird to have the > three built-in functions (defaultStandbyTaskAssignment etc) sitting in the > ApplicationMetadata class. If we consider them as some default util > functions, how about introducing moving those into their own static util > methods to separate from the ApplicationMetadata “fact objects” ? > Agreed. Updated in the latest revision of the kip. These have been moved > to TaskAssignorUtils > > > 3. I personally prefer `NodeAssignment` to be a read-only object > containing the decisions made by the assignor, including the > requestFollowupRebalance flag. For manipulating the half-baked results > inside the assignor itself, maybe we can just be flexible to let users use > whatever struts / their own classes even, if they like. WDYT? > Agreed. Updated in the latest version of the kip. > > > 1. For the API, thoughts on changing the method signature to return a > (non-Optional) TaskAssignor? Then we can either have the default > implementation return new HighAvailabilityTaskAssignor or just have a > default implementation class that people can extend if they don't want to > implement every method. > Based on some other discussion, I actually decided to get rid of the > plugin interface, and instead use config to specify individual plugin > behaviour. So the method you're referring to is no longer part of the > proposal. > > > 3. Speaking of ApplicationMetadata, the javadoc says it's read only but > theres methods that return void on it? It's not totally clear to me how > that interface is supposed to be used by the assignor. It'd be nice if we > could flip that interface such that it becomes part of the output instead > of an input to the plugin. > I've moved those methods to a util class. They're really utility methods > the assignor might want to call to do some default or optimized assignment > for some cases like rack-awareness. > > > 4. We should consider wrapping UUID in a ProcessID class so that we > control > the interface (there are a few places where UUID is directly used). > I like it. Updated the proposal. > > > 5. What does NodeState#newAssignmentForNode() do? I thought the point > was > for the plugin to make the assignment? Is that the result of the default > logic? > It doesn't need to be part of the interface. I've removed it. > > > re 2/6: > > I generally agree with these points, but I'd rather hash that out in a PR > than in the KIP review, as it'll be clearer what gets used how. It seems to > me (committers please correct me if I'm wrong) that as long as we're on the > same page about what information the interfaces are returning, that's ok at > this level of discussion. > > On Tue, Nov 7, 2023 at 12:03 PM Rohan Desai > wrote: > >> Hello All, >> >> I'd like to start a discussion on KIP-924 ( >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams) >> which proposes an interface to allow users to plug into the streams >> partition assignor. The motivation section in the KIP goes into some more >> detail on why we think this is a useful addition. Thanks in advance for >> your feedback! >> >> Best Regards, >> >> Rohan >> >>
Re: [DISCUSS] KIP-924: customizable task assignment for Streams
Thanks for the feedback so far! I think pretty much all of it is reasonable. I'll reply to it inline: > 1. All the API logic is granular at the Task level, except the previousOwnerForPartition func. I’m not clear what’s the motivation behind it, does our controller also want to change how the partitions->tasks mapping is formed? You're right that this is out of place. I've removed this method as it's not needed by the task assignor. > 2. Just on the API layering itself: it feels a bit weird to have the three built-in functions (defaultStandbyTaskAssignment etc) sitting in the ApplicationMetadata class. If we consider them as some default util functions, how about introducing moving those into their own static util methods to separate from the ApplicationMetadata “fact objects” ? Agreed. Updated in the latest revision of the kip. These have been moved to TaskAssignorUtils > 3. I personally prefer `NodeAssignment` to be a read-only object containing the decisions made by the assignor, including the requestFollowupRebalance flag. For manipulating the half-baked results inside the assignor itself, maybe we can just be flexible to let users use whatever struts / their own classes even, if they like. WDYT? Agreed. Updated in the latest version of the kip. > 1. For the API, thoughts on changing the method signature to return a (non-Optional) TaskAssignor? Then we can either have the default implementation return new HighAvailabilityTaskAssignor or just have a default implementation class that people can extend if they don't want to implement every method. Based on some other discussion, I actually decided to get rid of the plugin interface, and instead use config to specify individual plugin behaviour. So the method you're referring to is no longer part of the proposal. > 3. Speaking of ApplicationMetadata, the javadoc says it's read only but theres methods that return void on it? It's not totally clear to me how that interface is supposed to be used by the assignor. It'd be nice if we could flip that interface such that it becomes part of the output instead of an input to the plugin. I've moved those methods to a util class. They're really utility methods the assignor might want to call to do some default or optimized assignment for some cases like rack-awareness. > 4. We should consider wrapping UUID in a ProcessID class so that we control the interface (there are a few places where UUID is directly used). I like it. Updated the proposal. > 5. What does NodeState#newAssignmentForNode() do? I thought the point was for the plugin to make the assignment? Is that the result of the default logic? It doesn't need to be part of the interface. I've removed it. > re 2/6: I generally agree with these points, but I'd rather hash that out in a PR than in the KIP review, as it'll be clearer what gets used how. It seems to me (committers please correct me if I'm wrong) that as long as we're on the same page about what information the interfaces are returning, that's ok at this level of discussion. On Tue, Nov 7, 2023 at 12:03 PM Rohan Desai wrote: > Hello All, > > I'd like to start a discussion on KIP-924 ( > https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams) > which proposes an interface to allow users to plug into the streams > partition assignor. The motivation section in the KIP goes into some more > detail on why we think this is a useful addition. Thanks in advance for > your feedback! > > Best Regards, > > Rohan > >
[DISCUSS] KIP-924: customizable task assignment for Streams
Hello All, I'd like to start a discussion on KIP-924 ( https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams) which proposes an interface to allow users to plug into the streams partition assignor. The motivation section in the KIP goes into some more detail on why we think this is a useful addition. Thanks in advance for your feedback! Best Regards, Rohan
Re: [VOTE] KIP-761: Add Total Blocked Time Metric to Streams
Hello, I discovered a bug in the design of this metric. The bug is documented here: https://github.com/apache/kafka/pull/11805. We need to include time the producer spends waiting on topic metadata into the total blocked time. But to do this we would need to add a new producer metric that tracks the total time spent blocked on metadata. I've implemented this in a patch here: https://github.com/apache/kafka/pull/11805 I'm hoping I can just update this KIP to include the new producer metric. On Tue, Aug 31, 2021 at 1:07 AM Rohan Desai wrote: > FYI I've updated the metric names in the KIP to the form > ".*-time-ns-total" and clarified that the times being measured are in > nanoseconds. > > On Wed, Jul 21, 2021 at 5:09 PM Rohan Desai > wrote: > >> Now that the discussion thread's been open for a few days, I'm calling >> for a vote on >> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-761%3A+Add+Total+Blocked+Time+Metric+to+Streams >> >
Re: [DISCUSS] KIP-761: Add total blocked time metric to streams
Hello, I discovered a bug in the design of this metric. The bug is documented here: https://github.com/apache/kafka/pull/11805. We need to include time the producer spends waiting on topic metadata into the total blocked time. But to do this we would need to add a new producer metric that tracks the total time spent blocked on metadata. I've implemented this in a patch here: https://github.com/apache/kafka/pull/11805 I'm hoping I can just update this KIP to include the new producer metric. On Mon, Jul 12, 2021 at 12:00 PM Rohan Desai wrote: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-761%3A+Add+Total+Blocked+Time+Metric+to+Streams >
[jira] [Created] (KAFKA-13692) stream thread blocked-time-ns-total metric does not include producer metadata wait time
Rohan Desai created KAFKA-13692: --- Summary: stream thread blocked-time-ns-total metric does not include producer metadata wait time Key: KAFKA-13692 URL: https://issues.apache.org/jira/browse/KAFKA-13692 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.2.0 Reporter: Rohan Desai Fix For: 3.3.0 The stream thread blocked-time-ns-total metric does not include producer metadata wait time (time spent in `KafkaProducer.waitOnMetadata`). This can contribute significantly to actual total blocked time in some cases. For example, if a user deletes the streams sink topic, producers will wait until the max block timeout. This time does not get included in total blocked time when it should. -- This message was sent by Atlassian Jira (v8.20.1#820001)
Re: [VOTE] KIP-761: Add Total Blocked Time Metric to Streams
FYI I've updated the metric names in the KIP to the form ".*-time-ns-total" and clarified that the times being measured are in nanoseconds. On Wed, Jul 21, 2021 at 5:09 PM Rohan Desai wrote: > Now that the discussion thread's been open for a few days, I'm calling for > a vote on > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-761%3A+Add+Total+Blocked+Time+Metric+to+Streams >
[jira] [Created] (KAFKA-13229) KIP-761: implement a total blocked time metric in Kafka Streams
Rohan Desai created KAFKA-13229: --- Summary: KIP-761: implement a total blocked time metric in Kafka Streams Key: KAFKA-13229 URL: https://issues.apache.org/jira/browse/KAFKA-13229 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 3.1.0 Reporter: Rohan Desai Fix For: 3.1.0 KIP-761 proposes a total blocked time metric in streams that measures the total time (since the thread was started) that a given thread is blocked on Kafka. KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-761%3A+Add+Total+Blocked+Time+Metric+to+Streams -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [DISCUSS] KIP-761: Add total blocked time metric to streams
re sophie: The intent here was to include all blocked time (not just `RUNNING`). The caller can window the total blocked time themselves, and that can be compared with a timeseries of the state to understand the ratio in different states. I'll update the KIP to include `committed`. The admin API calls should be accounted for by the admin client iotime/iowaittime metrics. On Tue, Jul 20, 2021 at 11:49 PM Rohan Desai wrote: > > I remember now that we moved the round-trip PID's txn completion logic > into > init-transaction and commit/abort-transaction. So I think we'd count time > as in StreamsProducer#initTransaction as well (admittedly it is in most > cases a one-time thing). > > Makes sense - I'll update the KIP > > On Tue, Jul 20, 2021 at 11:48 PM Rohan Desai > wrote: > >> >> > I had a question - it seems like from the descriptionsof >> `txn-commit-time-total` and `offset-commit-time-total` that they measure >> similar processes for ALOS and EOS, but only `txn-commit-time-total` is >> included in `blocked-time-total`. Why isn't `offset-commit-time-total` also >> included? >> >> I've updated the KIP to include it. >> >> > Aside from `flush-time-total`, `txn-commit-time-total` and >> `offset-commit-time-total`, which will be producer/consumer client >> metrics, >> the rest of the metrics will be streams metrics that will be thread level, >> is that right? >> >> Based on the feedback from Guozhang, I've updated the KIP to reflect that >> the lower-level metrics are all client metrics that are then summed to >> compute the blocked time metric, which is a Streams metric. >> >> On Tue, Jul 20, 2021 at 11:58 AM Rohan Desai >> wrote: >> >>> > Similarly, I think "txn-commit-time-total" and >>> "offset-commit-time-total" may better be inside producer and consumer >>> clients respectively. >>> >>> I agree for offset-commit-time-total. For txn-commit-time-total I'm >>> proposing we measure `StreamsProducer.commitTransaction`, which wraps >>> multiple producer calls (sendOffsets, commitTransaction) >>> >>> > > For "txn-commit-time-total" specifically, besides >>> producer.commitTxn. >>> other txn-related calls may also be blocking, including >>> producer.beginTxn/abortTxn, I saw you mentioned "txn-begin-time-total" >>> later in the doc, but did not include it as a separate metric, and >>> similarly, should we have a `txn-abort-time-total` as well? If yes, >>> could >>> you update the KIP page accordingly. >>> >>> `beginTransaction` is not blocking - I meant to remove that from that >>> doc. I'll add something for abort. >>> >>> On Mon, Jul 19, 2021 at 11:55 PM Rohan Desai >>> wrote: >>> >>>> Thanks for the review Guozhang! responding to your feedback inline: >>>> >>>> > 1) I agree that the current ratio metrics is just "snapshot in >>>> point", and >>>> more flexible metrics that would allow reporters to calculate based on >>>> window intervals are better. However, the current mechanism of the >>>> proposed >>>> metrics assumes the thread->clients mapping as of today, where each >>>> thread >>>> would own exclusively one main consumer, restore consumer, producer and >>>> an >>>> admin client. But this mapping may be subject to change in the future. >>>> Have >>>> you thought about how this metric can be extended when, e.g. the >>>> embedded >>>> clients and stream threads are de-coupled? >>>> >>>> Of course this depends on how exactly we refactor the runtime - >>>> assuming that we plan to factor out consumers into an "I/O" layer that is >>>> responsible for receiving records and enqueuing them to be processed by >>>> processing threads, then I think it should be reasonable to count the time >>>> we spend blocked on this internal queue(s) as blocked. The main concern >>>> there to me is that the I/O layer would be doing something expensive like >>>> decompression that shouldn't be counted as "blocked". But if that really is >>>> so expensive that it starts to throw off our ratios then it's probably >>>> indicative of a larger problem that the "i/o layer" is a bottleneck and it >>>> would be worth refactoring so that decompression (or insert other expensive >>>
[VOTE] KIP-761: Add Total Blocked Time Metric to Streams
Now that the discussion thread's been open for a few days, I'm calling for a vote on https://cwiki.apache.org/confluence/display/KAFKA/KIP-761%3A+Add+Total+Blocked+Time+Metric+to+Streams
Re: [DISCUSS] KIP-761: Add total blocked time metric to streams
> I remember now that we moved the round-trip PID's txn completion logic into init-transaction and commit/abort-transaction. So I think we'd count time as in StreamsProducer#initTransaction as well (admittedly it is in most cases a one-time thing). Makes sense - I'll update the KIP On Tue, Jul 20, 2021 at 11:48 PM Rohan Desai wrote: > > > I had a question - it seems like from the descriptionsof > `txn-commit-time-total` and `offset-commit-time-total` that they measure > similar processes for ALOS and EOS, but only `txn-commit-time-total` is > included in `blocked-time-total`. Why isn't `offset-commit-time-total` also > included? > > I've updated the KIP to include it. > > > Aside from `flush-time-total`, `txn-commit-time-total` and > `offset-commit-time-total`, which will be producer/consumer client metrics, > the rest of the metrics will be streams metrics that will be thread level, > is that right? > > Based on the feedback from Guozhang, I've updated the KIP to reflect that > the lower-level metrics are all client metrics that are then summed to > compute the blocked time metric, which is a Streams metric. > > On Tue, Jul 20, 2021 at 11:58 AM Rohan Desai > wrote: > >> > Similarly, I think "txn-commit-time-total" and >> "offset-commit-time-total" may better be inside producer and consumer >> clients respectively. >> >> I agree for offset-commit-time-total. For txn-commit-time-total I'm >> proposing we measure `StreamsProducer.commitTransaction`, which wraps >> multiple producer calls (sendOffsets, commitTransaction) >> >> > > For "txn-commit-time-total" specifically, besides producer.commitTxn. >> other txn-related calls may also be blocking, including >> producer.beginTxn/abortTxn, I saw you mentioned "txn-begin-time-total" >> later in the doc, but did not include it as a separate metric, and >> similarly, should we have a `txn-abort-time-total` as well? If yes, could >> you update the KIP page accordingly. >> >> `beginTransaction` is not blocking - I meant to remove that from that >> doc. I'll add something for abort. >> >> On Mon, Jul 19, 2021 at 11:55 PM Rohan Desai >> wrote: >> >>> Thanks for the review Guozhang! responding to your feedback inline: >>> >>> > 1) I agree that the current ratio metrics is just "snapshot in >>> point", and >>> more flexible metrics that would allow reporters to calculate based on >>> window intervals are better. However, the current mechanism of the >>> proposed >>> metrics assumes the thread->clients mapping as of today, where each >>> thread >>> would own exclusively one main consumer, restore consumer, producer and >>> an >>> admin client. But this mapping may be subject to change in the future. >>> Have >>> you thought about how this metric can be extended when, e.g. the embedded >>> clients and stream threads are de-coupled? >>> >>> Of course this depends on how exactly we refactor the runtime - assuming >>> that we plan to factor out consumers into an "I/O" layer that is >>> responsible for receiving records and enqueuing them to be processed by >>> processing threads, then I think it should be reasonable to count the time >>> we spend blocked on this internal queue(s) as blocked. The main concern >>> there to me is that the I/O layer would be doing something expensive like >>> decompression that shouldn't be counted as "blocked". But if that really is >>> so expensive that it starts to throw off our ratios then it's probably >>> indicative of a larger problem that the "i/o layer" is a bottleneck and it >>> would be worth refactoring so that decompression (or insert other expensive >>> thing here) can also be done on the processing threads. >>> >>> > 2) [This and all below are minor comments] The "flush-time-total" may >>> better be a producer client metric, as "flush-wait-time-total", than a >>> streams metric, though the streams-level "total-blocked" can still >>> leverage >>> it. Similarly, I think "txn-commit-time-total" and >>> "offset-commit-time-total" may better be inside producer and consumer >>> clients respectively. >>> >>> Good call - I'll update the KIP >>> >>> > 3) The doc was not very clear on how "thread-start-time" would be >>> needed >>> when calculating streams util
Re: [DISCUSS] KIP-761: Add total blocked time metric to streams
> I had a question - it seems like from the descriptionsof `txn-commit-time-total` and `offset-commit-time-total` that they measure similar processes for ALOS and EOS, but only `txn-commit-time-total` is included in `blocked-time-total`. Why isn't `offset-commit-time-total` also included? I've updated the KIP to include it. > Aside from `flush-time-total`, `txn-commit-time-total` and `offset-commit-time-total`, which will be producer/consumer client metrics, the rest of the metrics will be streams metrics that will be thread level, is that right? Based on the feedback from Guozhang, I've updated the KIP to reflect that the lower-level metrics are all client metrics that are then summed to compute the blocked time metric, which is a Streams metric. On Tue, Jul 20, 2021 at 11:58 AM Rohan Desai wrote: > > Similarly, I think "txn-commit-time-total" and > "offset-commit-time-total" may better be inside producer and consumer > clients respectively. > > I agree for offset-commit-time-total. For txn-commit-time-total I'm > proposing we measure `StreamsProducer.commitTransaction`, which wraps > multiple producer calls (sendOffsets, commitTransaction) > > > > For "txn-commit-time-total" specifically, besides producer.commitTxn. > other txn-related calls may also be blocking, including > producer.beginTxn/abortTxn, I saw you mentioned "txn-begin-time-total" > later in the doc, but did not include it as a separate metric, and > similarly, should we have a `txn-abort-time-total` as well? If yes, could > you update the KIP page accordingly. > > `beginTransaction` is not blocking - I meant to remove that from that doc. > I'll add something for abort. > > On Mon, Jul 19, 2021 at 11:55 PM Rohan Desai > wrote: > >> Thanks for the review Guozhang! responding to your feedback inline: >> >> > 1) I agree that the current ratio metrics is just "snapshot in point", >> and >> more flexible metrics that would allow reporters to calculate based on >> window intervals are better. However, the current mechanism of the >> proposed >> metrics assumes the thread->clients mapping as of today, where each thread >> would own exclusively one main consumer, restore consumer, producer and an >> admin client. But this mapping may be subject to change in the future. >> Have >> you thought about how this metric can be extended when, e.g. the embedded >> clients and stream threads are de-coupled? >> >> Of course this depends on how exactly we refactor the runtime - assuming >> that we plan to factor out consumers into an "I/O" layer that is >> responsible for receiving records and enqueuing them to be processed by >> processing threads, then I think it should be reasonable to count the time >> we spend blocked on this internal queue(s) as blocked. The main concern >> there to me is that the I/O layer would be doing something expensive like >> decompression that shouldn't be counted as "blocked". But if that really is >> so expensive that it starts to throw off our ratios then it's probably >> indicative of a larger problem that the "i/o layer" is a bottleneck and it >> would be worth refactoring so that decompression (or insert other expensive >> thing here) can also be done on the processing threads. >> >> > 2) [This and all below are minor comments] The "flush-time-total" may >> better be a producer client metric, as "flush-wait-time-total", than a >> streams metric, though the streams-level "total-blocked" can still >> leverage >> it. Similarly, I think "txn-commit-time-total" and >> "offset-commit-time-total" may better be inside producer and consumer >> clients respectively. >> >> Good call - I'll update the KIP >> >> > 3) The doc was not very clear on how "thread-start-time" would be >> needed >> when calculating streams utilization along with total-blocked time, could >> you elaborate a bit more in the KIP? >> >> Yes, will do. >> >> > For "txn-commit-time-total" specifically, besides producer.commitTxn. >> other txn-related calls may also be blocking, including >> producer.beginTxn/abortTxn, I saw you mentioned "txn-begin-time-total" >> later in the doc, but did not include it as a separate metric, and >> similarly, should we have a `txn-abort-time-total` as well? If yes, could >> you update the KIP page accordingly. >> >> Ack. >> >> On Mon, Jul 12, 2021 at 11:29 PM Rohan Desai >> wrote: >> >>> Hello All, >>> >>> I'd like to start a discussion on the KIP linked above which proposes >>> some metrics that we would find useful to help measure whether a Kafka >>> Streams application is saturated. The motivation section in the KIP goes >>> into some more detail on why we think this is a useful addition to the >>> metrics already implemented. Thanks in advance for your feedback! >>> >>> Best Regards, >>> >>> Rohan >>> >>> On Mon, Jul 12, 2021 at 12:00 PM Rohan Desai >>> wrote: >>> >>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-761%3A+Add+Total+Blocked+Time+Metric+to+Streams >>>> >>>
Re: [DISCUSS] KIP-761: Add total blocked time metric to streams
> Similarly, I think "txn-commit-time-total" and "offset-commit-time-total" may better be inside producer and consumer clients respectively. I agree for offset-commit-time-total. For txn-commit-time-total I'm proposing we measure `StreamsProducer.commitTransaction`, which wraps multiple producer calls (sendOffsets, commitTransaction) > > For "txn-commit-time-total" specifically, besides producer.commitTxn. other txn-related calls may also be blocking, including producer.beginTxn/abortTxn, I saw you mentioned "txn-begin-time-total" later in the doc, but did not include it as a separate metric, and similarly, should we have a `txn-abort-time-total` as well? If yes, could you update the KIP page accordingly. `beginTransaction` is not blocking - I meant to remove that from that doc. I'll add something for abort. On Mon, Jul 19, 2021 at 11:55 PM Rohan Desai wrote: > Thanks for the review Guozhang! responding to your feedback inline: > > > 1) I agree that the current ratio metrics is just "snapshot in point", > and > more flexible metrics that would allow reporters to calculate based on > window intervals are better. However, the current mechanism of the proposed > metrics assumes the thread->clients mapping as of today, where each thread > would own exclusively one main consumer, restore consumer, producer and an > admin client. But this mapping may be subject to change in the future. Have > you thought about how this metric can be extended when, e.g. the embedded > clients and stream threads are de-coupled? > > Of course this depends on how exactly we refactor the runtime - assuming > that we plan to factor out consumers into an "I/O" layer that is > responsible for receiving records and enqueuing them to be processed by > processing threads, then I think it should be reasonable to count the time > we spend blocked on this internal queue(s) as blocked. The main concern > there to me is that the I/O layer would be doing something expensive like > decompression that shouldn't be counted as "blocked". But if that really is > so expensive that it starts to throw off our ratios then it's probably > indicative of a larger problem that the "i/o layer" is a bottleneck and it > would be worth refactoring so that decompression (or insert other expensive > thing here) can also be done on the processing threads. > > > 2) [This and all below are minor comments] The "flush-time-total" may > better be a producer client metric, as "flush-wait-time-total", than a > streams metric, though the streams-level "total-blocked" can still leverage > it. Similarly, I think "txn-commit-time-total" and > "offset-commit-time-total" may better be inside producer and consumer > clients respectively. > > Good call - I'll update the KIP > > > 3) The doc was not very clear on how "thread-start-time" would be needed > when calculating streams utilization along with total-blocked time, could > you elaborate a bit more in the KIP? > > Yes, will do. > > > For "txn-commit-time-total" specifically, besides producer.commitTxn. > other txn-related calls may also be blocking, including > producer.beginTxn/abortTxn, I saw you mentioned "txn-begin-time-total" > later in the doc, but did not include it as a separate metric, and > similarly, should we have a `txn-abort-time-total` as well? If yes, could > you update the KIP page accordingly. > > Ack. > > On Mon, Jul 12, 2021 at 11:29 PM Rohan Desai > wrote: > >> Hello All, >> >> I'd like to start a discussion on the KIP linked above which proposes >> some metrics that we would find useful to help measure whether a Kafka >> Streams application is saturated. The motivation section in the KIP goes >> into some more detail on why we think this is a useful addition to the >> metrics already implemented. Thanks in advance for your feedback! >> >> Best Regards, >> >> Rohan >> >> On Mon, Jul 12, 2021 at 12:00 PM Rohan Desai >> wrote: >> >>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-761%3A+Add+Total+Blocked+Time+Metric+to+Streams >>> >>
Re: [DISCUSS] KIP-761: Add total blocked time metric to streams
Thanks for the review Guozhang! responding to your feedback inline: > 1) I agree that the current ratio metrics is just "snapshot in point", and more flexible metrics that would allow reporters to calculate based on window intervals are better. However, the current mechanism of the proposed metrics assumes the thread->clients mapping as of today, where each thread would own exclusively one main consumer, restore consumer, producer and an admin client. But this mapping may be subject to change in the future. Have you thought about how this metric can be extended when, e.g. the embedded clients and stream threads are de-coupled? Of course this depends on how exactly we refactor the runtime - assuming that we plan to factor out consumers into an "I/O" layer that is responsible for receiving records and enqueuing them to be processed by processing threads, then I think it should be reasonable to count the time we spend blocked on this internal queue(s) as blocked. The main concern there to me is that the I/O layer would be doing something expensive like decompression that shouldn't be counted as "blocked". But if that really is so expensive that it starts to throw off our ratios then it's probably indicative of a larger problem that the "i/o layer" is a bottleneck and it would be worth refactoring so that decompression (or insert other expensive thing here) can also be done on the processing threads. > 2) [This and all below are minor comments] The "flush-time-total" may better be a producer client metric, as "flush-wait-time-total", than a streams metric, though the streams-level "total-blocked" can still leverage it. Similarly, I think "txn-commit-time-total" and "offset-commit-time-total" may better be inside producer and consumer clients respectively. Good call - I'll update the KIP > 3) The doc was not very clear on how "thread-start-time" would be needed when calculating streams utilization along with total-blocked time, could you elaborate a bit more in the KIP? Yes, will do. > For "txn-commit-time-total" specifically, besides producer.commitTxn. other txn-related calls may also be blocking, including producer.beginTxn/abortTxn, I saw you mentioned "txn-begin-time-total" later in the doc, but did not include it as a separate metric, and similarly, should we have a `txn-abort-time-total` as well? If yes, could you update the KIP page accordingly. Ack. On Mon, Jul 12, 2021 at 11:29 PM Rohan Desai wrote: > Hello All, > > I'd like to start a discussion on the KIP linked above which proposes some > metrics that we would find useful to help measure whether a Kafka Streams > application is saturated. The motivation section in the KIP goes into some > more detail on why we think this is a useful addition to the metrics > already implemented. Thanks in advance for your feedback! > > Best Regards, > > Rohan > > On Mon, Jul 12, 2021 at 12:00 PM Rohan Desai > wrote: > >> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-761%3A+Add+Total+Blocked+Time+Metric+to+Streams >> >
Re: [DISCUSS] KIP-761: Add total blocked time metric to streams
Hello All, I'd like to start a discussion on the KIP linked above which proposes some metrics that we would find useful to help measure whether a Kafka Streams application is saturated. The motivation section in the KIP goes into some more detail on why we think this is a useful addition to the metrics already implemented. Thanks in advance for your feedback! Best Regards, Rohan On Mon, Jul 12, 2021 at 12:00 PM Rohan Desai wrote: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-761%3A+Add+Total+Blocked+Time+Metric+to+Streams >
[DISCUSS] KIP-761: Add total blocked time metric to streams
https://cwiki.apache.org/confluence/display/KAFKA/KIP-761%3A+Add+Total+Blocked+Time+Metric+to+Streams
[jira] [Created] (KAFKA-12707) KafkaProducer should have a clearer error message on sasl.mechanism misconfiguration
Rohan Desai created KAFKA-12707: --- Summary: KafkaProducer should have a clearer error message on sasl.mechanism misconfiguration Key: KAFKA-12707 URL: https://issues.apache.org/jira/browse/KAFKA-12707 Project: Kafka Issue Type: Bug Affects Versions: 3.0.0 Reporter: Rohan Desai Not sure if this is producer-specific, but I accidentally configured my producer by setting: ``` sasl.mechanism=plain ``` instead of ``` sasl.mechanism=PLAIN ``` When I did this, the producer just hangs and logs in a loop like this, which isn't very informative: [2021-04-21 21:33:20,519] WARN [Producer clientId=producer-1] Bootstrap broker pkc-g91q1.us-west-2.aws.stag.cpdev.cloud:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient:1050) [2021-04-21 21:33:21,584] INFO [Producer clientId=producer-1] Failed to create channel due to (org.apache.kafka.common.network.SaslChannelBuilder:239) org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to create SaslClient with mechanism plain [2021-04-21 21:33:21,584] WARN [Producer clientId=producer-1] Error connecting to node pkc-g91q1.us-west-2.aws.stag.cpdev.cloud:9092 (id: -1 rack: null) (org.apache.kafka.clients.NetworkClient:982) java.io.IOException: Channel could not be created for socket java.nio.channels.SocketChannel[closed] at org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:348) at org.apache.kafka.common.network.Selector.registerChannel(Selector.java:329) at org.apache.kafka.common.network.Selector.connect(Selector.java:256) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:977) at org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:73) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1148) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1036) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:328) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator at org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:240) at org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:338) ... 10 more Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to create SaslClient with mechanism plain [2021-04-21 21:33:21,584] WARN [Producer clientId=producer-1] Bootstrap broker pkc-g91q1.us-west-2.aws.stag.cpdev.cloud:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient:1050) It would be better to early-exit with a clear error message -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10585) Kafka Streams should clean up the state store directory from cleanup
Rohan Desai created KAFKA-10585: --- Summary: Kafka Streams should clean up the state store directory from cleanup Key: KAFKA-10585 URL: https://issues.apache.org/jira/browse/KAFKA-10585 Project: Kafka Issue Type: Bug Components: streams Reporter: Rohan Desai Currently, `KafkaStreams.cleanup` cleans up all the task-level directories and the global directory. However it doesn't clean up the enclosing state store directory, though streams does create this directory when it initializes the state for the streams app. Feels like it should remove this directory when it cleans up. We notice this in ksql quite often, since every new query is a new streams app. Over time, we see lots of state store directories left around for old queries. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9659) Kafka Streams / Consumer fails on "fatal exception: group.instance.id gets fenced"
Rohan Desai created KAFKA-9659: -- Summary: Kafka Streams / Consumer fails on "fatal exception: group.instance.id gets fenced" Key: KAFKA-9659 URL: https://issues.apache.org/jira/browse/KAFKA-9659 Project: Kafka Issue Type: Bug Affects Versions: 2.5.0 Reporter: Rohan Desai I'm running a KSQL query, which underneath is built into a Kafka Streams application. The application has been running without issue for a few days, until today, when all the streams threads exited with: ``` [ERROR] 2020-03-05 00:57:58,776 [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle - [Consumer instanceId=ksql-1-2, clientId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2-consumer, groupId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5] Received fatal exception: group.instance.id gets fenced [ERROR] 2020-03-05 00:57:58,776 [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] org.apache.kafka.clients.consumer.internals.AbstractCoordinator onFailure - [Consumer instanceId=ksql-1-2, clientId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2-consumer, groupId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5] Caught fenced group.instance.id Optional[ksql-1-2] error in heartbeat thread [ERROR] 2020-03-05 00:57:58,776 [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] org.apache.kafka.streams.processor.internals.StreamThread run - stream-thread [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors: org.apache.kafka.common.errors.FencedInstanceIdException: The broker rejected this static consumer since another consumer with the same group.instance.id has registered with a different member.id. [INFO] 2020-03-05 00:57:58,776 [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] org.apache.kafka.streams.processor.internals.StreamThread setState - stream-thread [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] State transition from RUNNING to PENDING_SHUTDOWN ``` This event coincided with a broker (broker 2) having some downtime (as measured by a healthchecking service which periodically pings it with a produce/consume). I've attached the KSQL and Kafka Streams logs to this ticket. Here's a summary for one of the streams threads (instance id `ksql-1-2`): Around 00:56:36 the coordinator fails over from b11 to b2: ``` [INFO] 2020-03-05 00:56:36,258 [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2] org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle - [Consumer instanceId=ksql-1-2, clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2-consumer, groupId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0] Attempt to heartbeat failed since coordinator b11-pkc-lzxjz.us-west-2.aws.devel.cpdev.cloud:9092 (id: 2147483636 rack: null) is either not started or not valid. [INFO] 2020-03-05 00:56:36,258 [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2] org.apache.kafka.clients.consumer.internals.AbstractCoordinator markCoordinatorUnknown - [Consumer instanceId=ksql-1-2, clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2-consumer, groupId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0] Group coordinator b11-pkc-lzxjz.us-west-2.aws.devel.cpdev.cloud:9092 (id: 2147483636 rack: null) is unavailable or invalid, will attempt rediscovery [INFO] 2020-03-05 00:56:36,270 [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2] org.apache.kafka.clients.consumer.internals.AbstractCoordinator onSuccess - [Consumer instanceId=ksql-1-2, clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2-consumer, groupId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0] Discovered group coordinator b2-pkc-lzxjz.us-west-2.aws.devel.cpdev.cloud:9092 (id: 2147483645 rack: null) ``` A few seconds la
[VOTE] KIP-425: Add some Log4J Kafka Appender Properties for Producing to Secured Brokers
Calling a vote for this since it's pretty trivial/non-controversial https://cwiki.apache.org/confluence/display/KAFKA/KIP-425%3A+Add+some+Log4J+Kafka+Appender+Properties+for+Producing+to+Secured+Brokers
Re: KIP-425
Please ignore. Starting a new one with the right subject. On Sat, Feb 2, 2019 at 4:23 PM Rohan Desai wrote: > Starting a mail thread to discuss KIP-425: Add some Log4J Kafka Appender > Properties for Producing to Secured Brokers >
KIP-425
Starting a mail thread to discuss KIP-425: Add some Log4J Kafka Appender Properties for Producing to Secured Brokers
[DISCUSS] KIP-425: Add some Log4J Kafka Appender Properties for Producing to Secured Brokers
https://cwiki.apache.org/confluence/display/KAFKA/KIP-425%3A+Add+some+Log4J+Kafka+Appender+Properties+for+Producing+to+Secured+Brokers
[jira] [Created] (KAFKA-7896) Add some Log4J Kafka Properties for Producing to Secured Brokers
Rohan Desai created KAFKA-7896: -- Summary: Add some Log4J Kafka Properties for Producing to Secured Brokers Key: KAFKA-7896 URL: https://issues.apache.org/jira/browse/KAFKA-7896 Project: Kafka Issue Type: Bug Reporter: Rohan Desai The existing Log4J Kafka appender supports producing to brokers that use the GSSAPI (kerberos) sasl mechanism, and only support configuring jaas via a jaas config file. Filing this issue to cover extending this to include the PLAIN mechanism and to support configuring jaas via an in-line configuration. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
Request for Confluence access
Hello, I want to write a KIP and need access to do so in Confluence.
[jira] [Created] (KAFKA-7311) Sender should reset next batch expiry time between poll loops
Rohan Desai created KAFKA-7311: -- Summary: Sender should reset next batch expiry time between poll loops Key: KAFKA-7311 URL: https://issues.apache.org/jira/browse/KAFKA-7311 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 2.1.0 Reporter: Rohan Desai Fix For: 2.1.0 Sender does not reset next batch expiry time between poll loops. This means that once it crosses the expiry time of the first batch, it starts spinning on epoll with a timeout of 0, which consumes a lot of CPU. We observed this running KSQL when investigating why throughput would drop after about 10 minutes (the default delivery timeout). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7064) "Unexpected resource type GROUP" when describing broker configs using latest admin client
Rohan Desai created KAFKA-7064: -- Summary: "Unexpected resource type GROUP" when describing broker configs using latest admin client Key: KAFKA-7064 URL: https://issues.apache.org/jira/browse/KAFKA-7064 Project: Kafka Issue Type: Bug Affects Versions: 2.0.0 Reporter: Rohan Desai I'm getting the following error when I try to describe broker configs using the admin client: ```org.apache.kafka.common.errors.InvalidRequestException: Unexpected resource type GROUP for resource 0``` I think its due to this commit: https://github.com/apache/kafka/commit/49db5a63c043b50c10c2dfd0648f8d74ee917b6a My guess at what's going on is that now that the client is using ConfigResource instead of Resource it's sending a describe request for resource type BROKER w/ id 3, while the broker associates id 3 w/ GROUP -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (KAFKA-6383) StreamThread.shutdown doesn't clean up completely when called before StreamThread.start
[ https://issues.apache.org/jira/browse/KAFKA-6383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rohan Desai reopened KAFKA-6383: I missed a race condition in my fix. If we start and then shutdown a streams thread without the streams thread running in between, then shutdown() throws an IllegalThreadStateException. This happens because shutdown() uses StreamThread.state to decide whether to call start(), and the state is transitioned from run which may not have executed yet. > StreamThread.shutdown doesn't clean up completely when called before > StreamThread.start > --- > > Key: KAFKA-6383 > URL: https://issues.apache.org/jira/browse/KAFKA-6383 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Rohan Desai >Assignee: Rohan Desai > Fix For: 1.1.0 > > > The following code leaks a producer network thread: > {code} > ks = new KafkaStreams(...); > ks.close(); > {code} > The underlying issue is that KafkaStreams creates a bunch of StreamsThreads > via StreamThread.create, which in turn creates a bunch of stuff (including a > producer). These resources are cleaned up only when the thread exits. So if > the thread was never started, then they are never cleaned up. > StreamThread.shutdown should clean up if it sees that the thread has never > been started. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6383) StreamThread.shutdown doesn't clean up completely when called beforeStreamThread.start
Rohan Desai created KAFKA-6383: -- Summary: StreamThread.shutdown doesn't clean up completely when called beforeStreamThread.start Key: KAFKA-6383 URL: https://issues.apache.org/jira/browse/KAFKA-6383 Project: Kafka Issue Type: Bug Reporter: Rohan Desai The following code leaks a producer network thread: {code} ks = new KafkaStreams(...); ks.close(); {code} The underlying issue is that KafkaStreams creates a bunch of StreamsThreads via StreamThread.create, which in turn creates a bunch of stuff (including a producer). These resources are cleaned up only when the thread exits. So if the thread was never started, then they are never cleaned up. StreamThread.shutdown should clean up if it sees that the thread has never been started. -- This message was sent by Atlassian JIRA (v6.4.14#64029)