[jira] [Commented] (KAFKA-17446) Kafka streams stuck in rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-17446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17885159#comment-17885159 ] A. Sophie Blee-Goldman commented on KAFKA-17446: [~rohitbobade] is this ticket a duplicate of https://issues.apache.org/jira/browse/KAFKA-17445 ? Can we close this one? > Kafka streams stuck in rebalancing > -- > > Key: KAFKA-17446 > URL: https://issues.apache.org/jira/browse/KAFKA-17446 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.8.0 >Reporter: Rohit Bobade >Priority: Major > > Kafka streams stuck in endless rebalancing with the following error: > org.apache.kafka.streams.errors.LockException: stream-thread task [0_1] > Failed to lock the state directory for task 0_1 > org.apache.kafka.streams.processor.internals.TaskManager - stream-thread > Encountered lock exception. Reattempting locking the state in the next > iteration. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16955) ConcurrentModification exception thrown by KafkaStream threadState access
[ https://issues.apache.org/jira/browse/KAFKA-16955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-16955: --- Fix Version/s: 3.8.1 > ConcurrentModification exception thrown by KafkaStream threadState access > - > > Key: KAFKA-16955 > URL: https://issues.apache.org/jira/browse/KAFKA-16955 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Rohan Desai >Assignee: Rohan Desai >Priority: Major > Fix For: 3.9.0, 3.8.1 > > > We see occasional ConcurrentModificationExceptions thrown when accessing > threadState: > > > {code:java} > 155.745service_application1 info[ERROR] 2024-06-11 07:56:42.417 > [e2e-de9a85a4-b1d9-4807-bc34-e269f760bcd7-StreamThread-2] > ResponsiveKafkaStreams - stream-client > [e2e-de9a85a4-b1d9-4807-bc34-e269f760bcd7] Replacing thread in the streams > uncaught exception handler155.745service_application1 > infoorg.apache.kafka.streams.errors.StreamsException: > java.util.ConcurrentModificationException155.745service_application1 info > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:729) > ~[kafka-streams-3.7.0.jar:?]155.745service_application1 infoat > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645) > [kafka-streams-3.7.0.jar:?]155.745service_application1 infoCaused by: > java.util.ConcurrentModificationException155.745service_application1 info at > java.util.HashMap$HashIterator.nextNode(HashMap.java:1605) > ~[?:?]155.745service_application1 infoat > java.util.HashMap$ValueIterator.next(HashMap.java:1633) > ~[?:?]155.745service_application1 info at > org.apache.kafka.streams.KafkaStreams$StreamStateListener.maybeSetRunning(KafkaStreams.java:656) > ~[kafka-streams-3.7.0.jar:?]155.745service_application1 infoat > org.apache.kafka.streams.KafkaStreams$StreamStateListener.onChange(KafkaStreams.java:686) > ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info at > org.apache.kafka.streams.processor.internals.StreamThread.setState(StreamThread.java:261) > ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info at > org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:1120) > ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info at > org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:921) > ~[kafka-streams-3.7.0.jar:?]155.745service_application1 infoat > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686) > ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info... 1 > more {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17441) Add RETRY option to other exception handlers
A. Sophie Blee-Goldman created KAFKA-17441: -- Summary: Add RETRY option to other exception handlers Key: KAFKA-17441 URL: https://issues.apache.org/jira/browse/KAFKA-17441 Project: Kafka Issue Type: Improvement Components: streams Reporter: A. Sophie Blee-Goldman In kip-1065 we added a RETRY response option to the ProductionExceptionHandler. However, retrying an action instead of either crashing or dropping records would be useful for the other exception handlers as well. We should consider a followup KIP to add a RETRY response option to the ProcessingExceptionHandler and/or DeserializationExceptionHandler -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-13499) Avoid restoring outdated records
[ https://issues.apache.org/jira/browse/KAFKA-13499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17861282#comment-17861282 ] A. Sophie Blee-Goldman commented on KAFKA-13499: Few quick clarification questions for you [~mjsax] : # Is this ticket substantively different from KAFKA-7934 ? # Why "especially for stream-stream joins"? > Avoid restoring outdated records > > > Key: KAFKA-13499 > URL: https://issues.apache.org/jira/browse/KAFKA-13499 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Danica Fine >Priority: Major > > Kafka Streams has the config `windowstore.changelog.additional.retention.ms` > to allow for an increase retention time. > While an increase retention time can be useful, it can also lead to > unnecessary restore cost, especially for stream-stream joins. Assume a > stream-stream join with 1h window size and a grace period of 1h. For this > case, we only need 2h of data to restore. If we lag, the > `windowstore.changelog.additional.retention.ms` helps to prevent the broker > from truncating data too early. However, if we don't lag and we need to > restore, we restore everything from the changelog. > Instead of doing a seek-to-beginning, we could use the timestamp index to > seek the first offset older than the 2h "window" of data that we need to > restore, to avoid unnecessary work. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14567) Kafka Streams crashes after ProducerFencedException
[ https://issues.apache.org/jira/browse/KAFKA-14567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17855179#comment-17855179 ] A. Sophie Blee-Goldman commented on KAFKA-14567: [~cadonna] [~mjsax] do you guys need any help on this? I assume it is a blocker for 3.8.0 so I would be happy to pitch in and help unblock the release (if you can't already tell, I am eager to start using some new features in 3.8.0 – one in particular ;)) > Kafka Streams crashes after ProducerFencedException > --- > > Key: KAFKA-14567 > URL: https://issues.apache.org/jira/browse/KAFKA-14567 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Matthias J. Sax >Priority: Blocker > Labels: eos > Fix For: 3.8.0 > > > Running a Kafka Streams application with EOS-v2. > We first see a `ProducerFencedException`. After the fencing, the fenced > thread crashed resulting in a non-recoverable error: > {quote}[2022-12-22 13:49:13,423] ERROR [i-0c291188ec2ae17a0-StreamThread-3] > stream-thread [i-0c291188ec2ae17a0-StreamThread-3] Failed to process stream > task 1_2 due to the following error: > (org.apache.kafka.streams.processor.internals.TaskExecutor) > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=1_2, processor=KSTREAM-SOURCE-05, > topic=node-name-repartition, partition=2, offset=539776276, > stacktrace=java.lang.IllegalStateException: TransactionalId > stream-soak-test-72b6e57c-c2f5-489d-ab9f-fdbb215d2c86-3: Invalid transition > attempted from state FATAL_ERROR to state ABORTABLE_ERROR > at > org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974) > at > org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:394) > at > org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:620) > at > org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1079) > at > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:959) > at > org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:257) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:207) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162) > at > org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) > at > org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:88) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) > at > org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:791) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867) > at > org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:791) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:722) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1645) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(Stream
[jira] [Commented] (KAFKA-16945) Cleanup StreamsBuilder and TopologyConfig
[ https://issues.apache.org/jira/browse/KAFKA-16945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854565#comment-17854565 ] A. Sophie Blee-Goldman commented on KAFKA-16945: [~mjsax] are you sure we don't already have a ticket for this (possibly even two)? Otherwise I wholeheartedly agree with this, the current state of things is so messy > Cleanup StreamsBuilder and TopologyConfig > - > > Key: KAFKA-16945 > URL: https://issues.apache.org/jira/browse/KAFKA-16945 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Minor > Labels: needs-kip > > Historically, Kafka Streams offers two ways to build a topology: either via > the PAPI by creating a `new Topology()` explicitly, or via the > `StreamsBuilder` which returns a topology via `build()` method. > We later added an overload `build(Properties)` to enable topology > optimizations for the DSL layer. > Furthermore, we also added `TopologyConfig` object, which can be passed into > `new Topology(TopologyConfig)` as well as `StreamsBuilder(TopologyConfig)`. > We should consider to unify the different approaches to simplify the rather > complex API we have right now. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16903) Task should consider producer error previously occurred for different task
[ https://issues.apache.org/jira/browse/KAFKA-16903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17853456#comment-17853456 ] A. Sophie Blee-Goldman commented on KAFKA-16903: [~cadonna] is this really a regression that only affects 3.7 or has it been around for longer and you just happened to catch it in 3.7? Just wondering because we were running some testing on a slightly older version (3.4 or 3.6, don't remember which) and I think we might have seen this. > Task should consider producer error previously occurred for different task > -- > > Key: KAFKA-16903 > URL: https://issues.apache.org/jira/browse/KAFKA-16903 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Major > Fix For: 3.8.0 > > > A task does not consider a producer error that occurred for a different task. > The following log messages show the issue. > Task {{0_2}} of a Streams app (EOSv2 enabled) crashes while sending records > with an {{InvalidTxnStateException}}: > {code:java} > [2024-05-30 10:20:35,881] ERROR [kafka-producer-network-thread | > i-0af25f5c2bd9bba31-StreamThread-1-producer] stream-thread > [i-0af25f5c2bd9bba31-StreamThread-1] stream-task [0_2] Error encountered > sending record to topic stream-soak-test-node-name-repartition for task 0_2 > due to: > org.apache.kafka.common.errors.InvalidTxnStateException: The producer > attempted a transactional operation in an invalid state. > Exception handler choose to FAIL the processing, no more records would be > sent. (org.apache.kafka.streams.processor.internals.RecordCollectorImpl) > org.apache.kafka.common.errors.InvalidTxnStateException: The producer > attempted a transactional operation in an invalid state. > [2024-05-30 10:20:35,886] ERROR [i-0af25f5c2bd9bba31-StreamThread-1] > stream-thread [i-0af25f5c2bd9bba31-StreamThread-1] Failed to process stream > task 0_2 due to the following error: > (org.apache.kafka.streams.processor.internals.TaskExecutor) > org.apache.kafka.streams.errors.StreamsException: Error encountered sending > record to topic stream-soak-test-node-name-repartition for task 0_2 due to: > org.apache.kafka.common.errors.InvalidTxnStateException: The producer > attempted a transactional operation in an invalid state. > Exception handler choose to FAIL the processing, no more records would be > sent. > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:316) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:285) > at > org.apache.kafka.clients.producer.KafkaProducer$AppendCallbacks.onCompletion(KafkaProducer.java:1565) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:311) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:272) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeExceptionally(ProducerBatch.java:236) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:829) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:818) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:770) > at > org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:702) > at > org.apache.kafka.clients.producer.internals.Sender.lambda$null$2(Sender.java:627) > at java.util.ArrayList.forEach(ArrayList.java:1259) > at > org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$3(Sender.java:612) > at java.lang.Iterable.forEach(Iterable.java:75) > at > org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:612) > at > org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$9(Sender.java:916) > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154) > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:608) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:600) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:348) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:250) > at java.lang.Thread.run(Thread.java:750) > Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The > producer attempted a transactional operation in an invalid state. > {code} > Just before the exception of task 0_2 also task 0_
[jira] [Updated] (KAFKA-16916) ClientAuthenticationFailureTest.testAdminClientWithInvalidCredentials will run forever
[ https://issues.apache.org/jira/browse/KAFKA-16916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-16916: --- Priority: Blocker (was: Major) > ClientAuthenticationFailureTest.testAdminClientWithInvalidCredentials will > run forever > -- > > Key: KAFKA-16916 > URL: https://issues.apache.org/jira/browse/KAFKA-16916 > Project: Kafka > Issue Type: Bug >Reporter: Luke Chen >Assignee: Haruki Okada >Priority: Blocker > > ClientAuthenticationFailureTest.testAdminClientWithInvalidCredentials will > run forever -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16916) ClientAuthenticationFailureTest.testAdminClientWithInvalidCredentials will run forever
[ https://issues.apache.org/jira/browse/KAFKA-16916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-16916: --- Fix Version/s: 3.8.0 > ClientAuthenticationFailureTest.testAdminClientWithInvalidCredentials will > run forever > -- > > Key: KAFKA-16916 > URL: https://issues.apache.org/jira/browse/KAFKA-16916 > Project: Kafka > Issue Type: Bug >Reporter: Luke Chen >Assignee: Haruki Okada >Priority: Blocker > Fix For: 3.8.0 > > > ClientAuthenticationFailureTest.testAdminClientWithInvalidCredentials will > run forever -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15045) Move Streams task assignor to public configs
[ https://issues.apache.org/jira/browse/KAFKA-15045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-15045: --- Description: https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams > Move Streams task assignor to public configs > > > Key: KAFKA-15045 > URL: https://issues.apache.org/jira/browse/KAFKA-15045 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: A. Sophie Blee-Goldman >Priority: Major > Labels: kip > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16875) Replace ClientState with TaskAssignment when creating individual consumer Assignments
A. Sophie Blee-Goldman created KAFKA-16875: -- Summary: Replace ClientState with TaskAssignment when creating individual consumer Assignments Key: KAFKA-16875 URL: https://issues.apache.org/jira/browse/KAFKA-16875 Project: Kafka Issue Type: Sub-task Reporter: A. Sophie Blee-Goldman In the initial implementation of KIP-924 in version 3.8, we converted from the new TaskAssignor's output type (TaskAssignment) into the old ClientState-based assignment representation. This allowed us to plug in a custom assignor without converting all the internal mechanisms that occur after the KafkaStreams client level assignment and process it into a consumer level assignment. However we ultimately want to get rid of ClientState altogether, so we need to invert this logic so that we instead convert the ClientState into a TaskAssignment and then use the TaskAssignment to process the assigned tasks into consumer Assignments -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16874) Remove old TaskAssignor interface
A. Sophie Blee-Goldman created KAFKA-16874: -- Summary: Remove old TaskAssignor interface Key: KAFKA-16874 URL: https://issues.apache.org/jira/browse/KAFKA-16874 Project: Kafka Issue Type: Sub-task Reporter: A. Sophie Blee-Goldman Once we have the new HAAssignor that implements the new TaskAssignor interface, we can remove the old TaskAssignor interface. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16873) Remove StreamsConfig.INTERNAL_TASK_ASSIGNOR_CLASS
A. Sophie Blee-Goldman created KAFKA-16873: -- Summary: Remove StreamsConfig.INTERNAL_TASK_ASSIGNOR_CLASS Key: KAFKA-16873 URL: https://issues.apache.org/jira/browse/KAFKA-16873 Project: Kafka Issue Type: Sub-task Reporter: A. Sophie Blee-Goldman Once we have all the out-of-the-box assignors implementing the new TaskAssignor interface that corresponds to the new public task assignor config, we can remove the old internal task assignor config altogether. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16872) Remove ClientState class
A. Sophie Blee-Goldman created KAFKA-16872: -- Summary: Remove ClientState class Key: KAFKA-16872 URL: https://issues.apache.org/jira/browse/KAFKA-16872 Project: Kafka Issue Type: Sub-task Reporter: A. Sophie Blee-Goldman One of the end-state goals of KIP-924 is to remove the ClientState class altogether. There are some blockers to this such as the removal of the old internal task assignor config and the old HAAssignor, so this ticket will probably be one of the very last KAFKA-16868 subtasks to be tackled. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16871) Clean up internal AssignmentConfigs class in Streams
A. Sophie Blee-Goldman created KAFKA-16871: -- Summary: Clean up internal AssignmentConfigs class in Streams Key: KAFKA-16871 URL: https://issues.apache.org/jira/browse/KAFKA-16871 Project: Kafka Issue Type: Sub-task Reporter: A. Sophie Blee-Goldman In KIP-924 we added a new public AssignmentConfigs class to hold all of the, you guessed it, assignment related configs. However, there is an existing config class of the same name and largely the same contents but that's in an internal package, specifically inside the AssignorConfiguration class. We should remove the old AssignmentConfigs class that's in AssignorConfiguration and replace any usages of it with the new public AssignmentConfigs that we added in KIP-924 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16869) Rewrite HighAvailabilityTaskAssignor to implement the new TaskAssignor interface
A. Sophie Blee-Goldman created KAFKA-16869: -- Summary: Rewrite HighAvailabilityTaskAssignor to implement the new TaskAssignor interface Key: KAFKA-16869 URL: https://issues.apache.org/jira/browse/KAFKA-16869 Project: Kafka Issue Type: Sub-task Components: streams Reporter: A. Sophie Blee-Goldman We need to add a new HighAvailabilityTaskAssignor that implements the new TaskAssignor interface. Once we have that, we need to remember to also make these related changes: # Change the StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG default from null to the new HAAssignor # Check for this new HAAssignor type when evaluating the OptionalInt rack-aware assignment configs in the public AssignmentConfigs class. If these configs are Optional.empty() and the new HAAssignor is used, they should be overridden to the HAAssignor-specific default values. This code already exists but should be updated to check for the new HAAssignor class name instead of "null" # Until the old HAAssignor and old internal task assignor config can be removed completely, make sure the new HAAssignor is used by default when a TaskAssignor is selected in StreamsPartitionAssignor -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16868) Post KIP-924 StreamsPartitionAssignor code cleanup
A. Sophie Blee-Goldman created KAFKA-16868: -- Summary: Post KIP-924 StreamsPartitionAssignor code cleanup Key: KAFKA-16868 URL: https://issues.apache.org/jira/browse/KAFKA-16868 Project: Kafka Issue Type: Improvement Reporter: A. Sophie Blee-Goldman Making an umbrella task for all of the tech debt and code consolidation cleanup work that can/should be done following the implementation of [KIP-924: customizable task assignment for Streams|https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams] Most of this revolves around deduplicating code once it's no longer needed, including classes like the ClientState, StandbyTaskAssignor and related elements, and the old TaskAssignor interface along with its implementations. Note that in 3.8, the first version in which KIP-924 was released, we just added the new public config and new TaskAssignor interface but did not get rid of the old internal config or old TaskAssignor interface. If neither config is set in 3.8 we still default to the old HAAssignor, as a kind of opt-in feature flag, and internally will convert the output of the new TaskAssignor into the old style of ClientState-based assignment tracking. We intend to clean up all of the old code and eventually support only the new TaskAssignor interface as well as converting everything internally from the ClientState to the TaskAssignment/KafkaStreamsAssignment style output -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16867) Streams should run tag-based standby assignment based on rack ids
A. Sophie Blee-Goldman created KAFKA-16867: -- Summary: Streams should run tag-based standby assignment based on rack ids Key: KAFKA-16867 URL: https://issues.apache.org/jira/browse/KAFKA-16867 Project: Kafka Issue Type: Bug Components: streams Reporter: A. Sophie Blee-Goldman In KIP-708, we introduced a tag-based standby task assignment algorithm that runs if the user has configured their clients with "rack aware assignment tags". If no tags are configured, the default load-based standby task assignment algorithm is run instead. In KIP-924 we introduced a different kind of rack-aware assignment logic which is based on the "rack.id" of the consumers and topic partitions. While this did not replace the tag-based rack-aware assignment of KIP-708 which had different (and opposing) goals, we realized that Streams could leverage the rack.ids to run the tag-based standby task assignment algorithm even if clients were not configured with assignment tags. Unfortunately, during implementation of KIP-924, a bug in the logic meant that the tag-based algorithm was never actually being run based on the rack ids. This bug is present to this day and carried over (intentionally) during the task assignor refactoring of KIP-924. We should still fix this bug so that users can benefit from the resiliency of KIP-708 based on consumer rack ids, even if they did not explicitly opt-in by configuring clients with assignment tags, since KIP-708 is a net benefit with no downside -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16863) Consider removing `default.` prefix for exception handler config
[ https://issues.apache.org/jira/browse/KAFKA-16863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17850587#comment-17850587 ] A. Sophie Blee-Goldman commented on KAFKA-16863: If we're going to do a config name cleanup, maybe we can do a pass and fix all the current issues. For example I just happened to notice that two of the new rack-aware assignment configs use underscores in part of their config name, which does not follow the establish pattern and is likely to be messed up easily by users who depend on config files or otherwise configure things without the aid of the java config variable For example "rack.aware.assignment.traffic_cost" should be "rack.aware.assignment.traffic.cost" These are very new configs so maybe the disruption isn't worth it. On the other hand, if we can make this change now, maybe we can fix it before too many people start using the new feature and these underscore-based configs? > Consider removing `default.` prefix for exception handler config > > > Key: KAFKA-16863 > URL: https://issues.apache.org/jira/browse/KAFKA-16863 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Trivial > Labels: need-kip > > Kafka Streams has a set of configs with `default.` prefix. The intent for the > default-prefix is to make a distinction between, well the default, and > in-place overwrites in the code. Eg, users can specify ts-extractors on a > per-topic basis. > However, for the deserialization- and production-exception handlers, no such > overwrites are possible, and thus, `default.` does not really make sense, > because there is just one handler overall. Via KIP-1033 we added a new > processing-exception handler w/o a default-prefix, too. > Thus, we should consider to deprecate the two existing configs names and add > them back w/o the `default.` prefix. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16586) Test TaskAssignorConvergenceTest failing
[ https://issues.apache.org/jira/browse/KAFKA-16586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17848363#comment-17848363 ] A. Sophie Blee-Goldman commented on KAFKA-16586: By the way – if this test is failing, it's highly likely that this is a real bug. This test was designed to be completely deterministic (hence the seed) and doesn't set up any actual clients or clusters or anything that might make it flaky. [~mjsax] if you see this again can you report the failure here (including both the seed and rackAwareStrategy parameter)? > Test TaskAssignorConvergenceTest failing > > > Key: KAFKA-16586 > URL: https://issues.apache.org/jira/browse/KAFKA-16586 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Matthias J. Sax >Priority: Major > > {code:java} > java.lang.AssertionError: Assertion failed in randomized test. Reproduce > with: `runRandomizedScenario(-538095696758490522)`. at > org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.runRandomizedScenario(TaskAssignorConvergenceTest.java:545) > at > org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.randomClusterPerturbationsShouldConverge(TaskAssignorConvergenceTest.java:479){code} > This might expose an actual bug (or incorrect test setup) and should be > reproducible (die not try it myself yet). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16586) Test TaskAssignorConvergenceTest failing
[ https://issues.apache.org/jira/browse/KAFKA-16586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17848358#comment-17848358 ] A. Sophie Blee-Goldman commented on KAFKA-16586: Note: when reporting an instance of failure for this test, please include both the seed (contained in the stack trace, eg "runRandomizedScenario(-3595932977400264775)") and the full test name including parameters (not present in the original ticket description, eg "rackAwareStrategy=balance_subtopology") We need the test name to figure out which variant of the assignor was running. This will help narrow down the issue to a specific rackAwareStrategy, as well as enable reproduction of the failed test > Test TaskAssignorConvergenceTest failing > > > Key: KAFKA-16586 > URL: https://issues.apache.org/jira/browse/KAFKA-16586 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Matthias J. Sax >Priority: Major > > {code:java} > java.lang.AssertionError: Assertion failed in randomized test. Reproduce > with: `runRandomizedScenario(-538095696758490522)`. at > org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.runRandomizedScenario(TaskAssignorConvergenceTest.java:545) > at > org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.randomClusterPerturbationsShouldConverge(TaskAssignorConvergenceTest.java:479){code} > This might expose an actual bug (or incorrect test setup) and should be > reproducible (die not try it myself yet). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16586) Test TaskAssignorConvergenceTest failing
[ https://issues.apache.org/jira/browse/KAFKA-16586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17848351#comment-17848351 ] A. Sophie Blee-Goldman commented on KAFKA-16586: Failed again: {code:java} randomClusterPerturbationsShouldConverge[rackAwareStrategy=balance_subtopology] – org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest Stacktracejava.lang.AssertionError: Assertion failed in randomized test. Reproduce with: `runRandomizedScenario(-3595932977400264775)`. {code} > Test TaskAssignorConvergenceTest failing > > > Key: KAFKA-16586 > URL: https://issues.apache.org/jira/browse/KAFKA-16586 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Matthias J. Sax >Priority: Major > > {code:java} > java.lang.AssertionError: Assertion failed in randomized test. Reproduce > with: `runRandomizedScenario(-538095696758490522)`. at > org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.runRandomizedScenario(TaskAssignorConvergenceTest.java:545) > at > org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.randomClusterPerturbationsShouldConverge(TaskAssignorConvergenceTest.java:479){code} > This might expose an actual bug (or incorrect test setup) and should be > reproducible (die not try it myself yet). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16361) Rack aware sticky assignor minQuota violations
[ https://issues.apache.org/jira/browse/KAFKA-16361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846467#comment-17846467 ] A. Sophie Blee-Goldman edited comment on KAFKA-16361 at 5/15/24 12:24 AM: -- Thanks, I think it's safe to say this is related to the rack-aware assignment code that was added in 3.5. Probably the same issue that [~flashmouse] found in KAFKA-15170 Fortunately I just merged that fix and cherrypicked it back to 3.7, so the patch should be included in both the upcoming 3.8 release and the 3.7.1 bugfix release, whenever that happens. Not sure of the timing for 3.7.1 but 3.8 is just a day from KIP freeze which means if all goes well, it will be available in a little over a month. If you need an immediate resolution in the meantime then you have two options: 1) disable rack-awareness which will effectively make the assignor just skip over the buggy code 2) if you can build from source and don't require an official release, just cherrypick [this fix|https://github.com/apache/kafka/pull/13965] to a branch with whatever version you'd like to use and compile it yourself. I wouldn't recommend building directly from trunk for a production environment since that contains untested code, but you can at least run your test again using the latest trunk build if you want to make sure that it fixes the issue you're experiencing. I'm pretty confident it will though was (Author: ableegoldman): Thanks, I think it's safe to say this is related to the rack-aware assignment code that was added in 3.5. Probably the same issue that [~flashmouse] found in [KAFKA-15170|https://issues.apache.org/jira/browse/KAFKA-15170] Fortunately I just merged that fix and cherrypicked it back to 3.7, so the patch should be included in both the upcoming 3.8 release and the 3.7.1 bugfix release, whenever that happens. Not sure of the timing for 3.7.1 but 3.8 is just a day from KIP freeze which means if all goes well, it will be available in a little over a month. If you need an immediate resolution in the meantime then you have two options: 1) disable rack-awareness which will effectively make the assignor just skip over the buggy code 2) if you can build from source and don't require an official release, just cherrypick [this fix|https://github.com/apache/kafka/pull/13965] to a branch with whatever version you'd like to use and compile it yourself. I wouldn't recommend building directly from trunk for a production environment since that contains untested code, but you can at least run your test again using the latest trunk build if you want to make sure that it fixes the issue you're experiencing. I'm pretty confident it will though > Rack aware sticky assignor minQuota violations > -- > > Key: KAFKA-16361 > URL: https://issues.apache.org/jira/browse/KAFKA-16361 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.5.1, 3.7.0, 3.6.1 >Reporter: Luke D >Priority: Major > Attachments: illegalstateexception.log > > > In some low topic replication scenarios the rack aware assignment in the > StickyAssignor fails to balance consumers to its own expectations and throws > an IllegalStateException, commonly crashing the application (depending on > application implementation). While uncommon the error is deterministic, and > so persists until the replication state changes. > > We have observed this in the wild in 3.5.1, and 3.6.1. We have reproduced it > locally in a test case in 3.6.1 and 3.7.0 (3.5.1 we did not try but likely > would also be reproducible there) > > Here is the error and stack from our test case against 3.7.0 > {code:java} > We haven't reached the expected number of members with more than the minQuota > partitions, but no more partitions to be assigned > java.lang.IllegalStateException: We haven't reached the expected number of > members with more than the minQuota partitions, but no more partitions to be > assigned > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.verifyUnfilledMembers(AbstractStickyAssignor.java:820) > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.build(AbstractStickyAssignor.java:652) > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.assignPartitions(AbstractStickyAssignor.java:113) > at > org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor.assign(AbstractPartitionAssignor.java:91) > {code} > Here is a specific test case from 3.7.0 that fails when passed to > StickyAssignor.assign: > {code:java} > Cluster(id = cluster-id, nodes = [host-3:1 (id: 4 rack: rack-3), host-3:1 > (id: 3 rack: rack
[jira] [Commented] (KAFKA-16361) Rack aware sticky assignor minQuota violations
[ https://issues.apache.org/jira/browse/KAFKA-16361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846467#comment-17846467 ] A. Sophie Blee-Goldman commented on KAFKA-16361: Thanks, I think it's safe to say this is related to the rack-aware assignment code that was added in 3.5. Probably the same issue that [~flashmouse] found in [KAFKA-15170|https://issues.apache.org/jira/browse/KAFKA-15170] Fortunately I just merged that fix and cherrypicked it back to 3.7, so the patch should be included in both the upcoming 3.8 release and the 3.7.1 bugfix release, whenever that happens. Not sure of the timing for 3.7.1 but 3.8 is just a day from KIP freeze which means if all goes well, it will be available in a little over a month. If you need an immediate resolution in the meantime then you have two options: 1) disable rack-awareness which will effectively make the assignor just skip over the buggy code 2) if you can build from source and don't require an official release, just cherrypick [this fix|https://github.com/apache/kafka/pull/13965] to a branch with whatever version you'd like to use and compile it yourself. I wouldn't recommend building directly from trunk for a production environment since that contains untested code, but you can at least run your test again using the latest trunk build if you want to make sure that it fixes the issue you're experiencing. I'm pretty confident it will though > Rack aware sticky assignor minQuota violations > -- > > Key: KAFKA-16361 > URL: https://issues.apache.org/jira/browse/KAFKA-16361 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.5.1, 3.7.0, 3.6.1 >Reporter: Luke D >Priority: Major > Attachments: illegalstateexception.log > > > In some low topic replication scenarios the rack aware assignment in the > StickyAssignor fails to balance consumers to its own expectations and throws > an IllegalStateException, commonly crashing the application (depending on > application implementation). While uncommon the error is deterministic, and > so persists until the replication state changes. > > We have observed this in the wild in 3.5.1, and 3.6.1. We have reproduced it > locally in a test case in 3.6.1 and 3.7.0 (3.5.1 we did not try but likely > would also be reproducible there) > > Here is the error and stack from our test case against 3.7.0 > {code:java} > We haven't reached the expected number of members with more than the minQuota > partitions, but no more partitions to be assigned > java.lang.IllegalStateException: We haven't reached the expected number of > members with more than the minQuota partitions, but no more partitions to be > assigned > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.verifyUnfilledMembers(AbstractStickyAssignor.java:820) > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.build(AbstractStickyAssignor.java:652) > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.assignPartitions(AbstractStickyAssignor.java:113) > at > org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor.assign(AbstractPartitionAssignor.java:91) > {code} > Here is a specific test case from 3.7.0 that fails when passed to > StickyAssignor.assign: > {code:java} > Cluster(id = cluster-id, nodes = [host-3:1 (id: 4 rack: rack-3), host-3:1 > (id: 3 rack: rack-3), host-2:1 (id: 2 rack: rack-2), host-1:1 (id: 1 rack: > rack-1)], partitions = [Partition(topic = topic_name, partition = 57, leader > = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = > topic_name, partition = 90, leader = 2, replicas = [2], isr = [2], > offlineReplicas = []), Partition(topic = topic_name, partition = 28, leader = > 3, replicas = [3], isr = [3], offlineReplicas = []), Partition(topic = > topic_name, partition = 53, leader = 4, replicas = [4], isr = [4], > offlineReplicas = []), Partition(topic = topic_name, partition = 86, leader = > 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = > topic_name, partition = 24, leader = 4, replicas = [4,3,1], isr = [4,3,1], > offlineReplicas = []), Partition(topic = topic_name, partition = 49, leader = > 1, replicas = [1,2], isr = [1,2], offlineReplicas = []), Partition(topic = > topic_name, partition = 82, leader = 4, replicas = [4,2], isr = [4,2], > offlineReplicas = []), Partition(topic = topic_name, partition = 20, leader = > 2, replicas = [2,1], isr = [2,1], offlineReplicas = []), Partition(topic = > topic_name, partition = 45, leader = 2, replicas = [2], isr = [2], > offlineReplicas = []), Partition(topic = topic_name, partition = 78, leader =
[jira] [Updated] (KAFKA-15170) CooperativeStickyAssignor cannot adjust assignment correctly
[ https://issues.apache.org/jira/browse/KAFKA-15170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-15170: --- Fix Version/s: 3.8.0 3.7.1 > CooperativeStickyAssignor cannot adjust assignment correctly > > > Key: KAFKA-15170 > URL: https://issues.apache.org/jira/browse/KAFKA-15170 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.5.0 >Reporter: li xiangyuan >Assignee: li xiangyuan >Priority: Major > Fix For: 3.8.0, 3.7.1 > > > AbstractStickyAssignor use ConstrainedAssignmentBuilder to build assignment > when all consumers in group subscribe the same topic list, but it couldn't > add all partitions move owner to another consumer to > ``partitionsWithMultiplePreviousOwners``. > > the reason is in function assignOwnedPartitions hasn't add partitions that > rack-mismatch with prev owner to allRevokedPartitions, then partition only in > this list would add to partitionsWithMultiplePreviousOwners. > > In Cooperative Rebalance, partitions have changed owner must be removed from > final assignment or will lead to incorrect consume behavior, I have already > raise a pr, please take a look, thx > > [https://github.com/apache/kafka/pull/13965] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15170) CooperativeStickyAssignor cannot adjust assignment correctly
[ https://issues.apache.org/jira/browse/KAFKA-15170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-15170. Resolution: Fixed > CooperativeStickyAssignor cannot adjust assignment correctly > > > Key: KAFKA-15170 > URL: https://issues.apache.org/jira/browse/KAFKA-15170 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.5.0 >Reporter: li xiangyuan >Assignee: li xiangyuan >Priority: Major > Fix For: 3.8.0, 3.7.1 > > > AbstractStickyAssignor use ConstrainedAssignmentBuilder to build assignment > when all consumers in group subscribe the same topic list, but it couldn't > add all partitions move owner to another consumer to > ``partitionsWithMultiplePreviousOwners``. > > the reason is in function assignOwnedPartitions hasn't add partitions that > rack-mismatch with prev owner to allRevokedPartitions, then partition only in > this list would add to partitionsWithMultiplePreviousOwners. > > In Cooperative Rebalance, partitions have changed owner must be removed from > final assignment or will lead to incorrect consume behavior, I have already > raise a pr, please take a look, thx > > [https://github.com/apache/kafka/pull/13965] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16277) CooperativeStickyAssignor does not spread topics evenly among consumer group
[ https://issues.apache.org/jira/browse/KAFKA-16277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-16277: --- Fix Version/s: 3.7.1 > CooperativeStickyAssignor does not spread topics evenly among consumer group > > > Key: KAFKA-16277 > URL: https://issues.apache.org/jira/browse/KAFKA-16277 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Cameron Redpath >Assignee: Cameron Redpath >Priority: Major > Fix For: 3.8.0, 3.7.1 > > Attachments: image-2024-02-19-13-00-28-306.png > > > Consider the following scenario: > `topic-1`: 12 partitions > `topic-2`: 12 partitions > > Of note, `topic-1` gets approximately 10 times more messages through it than > `topic-2`. > > Both of these topics are consumed by a single application, single consumer > group, which scales under load. Each member of the consumer group subscribes > to both topics. The `partition.assignment.strategy` being used is > `org.apache.kafka.clients.consumer.CooperativeStickyAssignor`. The > application may start with one consumer. It consumes all partitions from both > topics. > > The problem begins when the application scales up to two consumers. What is > seen is that all partitions from `topic-1` go to one consumer, and all > partitions from `topic-2` go to the other consumer. In the case with one > topic receiving more messages than the other, this results in a very > imbalanced group where one consumer is receiving 10x the traffic of the other > due to partition assignment. > > This is the issue being seen in our cluster at the moment. See this graph of > the number of messages being processed by each consumer as the group scales > from one to four consumers: > !image-2024-02-19-13-00-28-306.png|width=537,height=612! > Things to note from this graphic: > * With two consumers, the partitions for a topic all go to a single consumer > each > * With three consumers, the partitions for a topic are split between two > consumers each > * With four consumers, the partitions for a topic are split between three > consumers each > * The total number of messages being processed by each consumer in the group > is very imbalanced throughout the entire period > > With regard to the number of _partitions_ being assigned to each consumer, > the group is balanced. However, the assignment appears to be biased so that > partitions from the same topic go to the same consumer. In our scenario, this > leads to very undesirable partition assignment. > > I question if the behaviour of the assignor should be revised, so that each > topic has its partitions maximally spread across all available members of the > consumer group. In the above scenario, this would result in much more even > distribution of load. The behaviour would then be: > * With two consumers, 6 partitions from each topic go to each consumer > * With three consumers, 4 partitions from each topic go to each consumer > * With four consumers, 3 partitions from each topic go to each consumer > > Of note, we only saw this behaviour after migrating to the > `CooperativeStickyAssignor`. It was not an issue with the default partition > assignment strategy. > > It is possible this may be intended behaviour. In which case, what is the > preferred workaround for our scenario? Our current workaround if we decide to > go ahead with the update to `CooperativeStickyAssignor` may be to limit our > consumers so they only subscribe to one topic, and have two consumer threads > per instance of the application. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16758) Extend Consumer#close with option to leave the group or not
[ https://issues.apache.org/jira/browse/KAFKA-16758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846180#comment-17846180 ] A. Sophie Blee-Goldman commented on KAFKA-16758: I'm not super familiar with the new consumer, but for the legacy one at least I can say this should be very straightforward to implement, so I was thinking about applying the "newbie" label. Does anyone have a sense of whether this would be straightforward in the new consumer as well? FWIW I personally do not have time to pick up the KIP anytime soon (though I'd be happy to help shepherd/review it) but I do frequently get asked about good starter work by new contributors, so I'm just wondering if this one would be a reasonable addition to the list. [~lianetm] [~schofielaj] [~dajac] any thoughts? > Extend Consumer#close with option to leave the group or not > --- > > Key: KAFKA-16758 > URL: https://issues.apache.org/jira/browse/KAFKA-16758 > Project: Kafka > Issue Type: New Feature > Components: consumer >Reporter: A. Sophie Blee-Goldman >Priority: Major > Labels: needs-kip > > See comments on https://issues.apache.org/jira/browse/KAFKA-16514 for the > full context. > Essentially we would get rid of the "internal.leave.group.on.close" config > that is used as a backdoor by Kafka Streams right now to prevent closed > consumers from leaving the group, thus reducing unnecessary task movements > after a simple bounce. > This would be replaced by an actual public API that would allow the caller to > opt in or out to the LeaveGroup when close is called. This would be similar > to the KafkaStreams#close(CloseOptions) API, and in fact would be how that > API will be implemented (since it only works for static groups at the moment > as noted in KAFKA-16514 ) > This has several benefits over the current situation: > # It allows plain consumer apps to opt-out of leaving the group when closed, > which is currently not possible through any public API (only an internal > backdoor config) > # It enables the caller to dynamically select the appropriate action > depending on why the client is being closed – for example, you would not want > the consumer to leave the group during a simple restart, but would want it to > leave the group when shutting down the app or if scaling down the node. This > is not possible today, even with the internal config, since configs are > immutable > # It can be leveraged to "fix" the KafkaStreams#close(closeOptions) API so > that the user's choice to leave the group during close will be respected for > non-static members -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.
[ https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846178#comment-17846178 ] A. Sophie Blee-Goldman commented on KAFKA-16514: FYI I filed https://issues.apache.org/jira/browse/KAFKA-16758 to cover the KIP for this new consumer feature. Let's use this ticket to track the issue in Streams specifically, which is obviously dependent on/blocked by this KIP. > Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup > flag. > --- > > Key: KAFKA-16514 > URL: https://issues.apache.org/jira/browse/KAFKA-16514 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Sal Sorrentino >Priority: Minor > > Working with Kafka Streams 3.7.0, but may affect earlier versions as well. > When attempting to shutdown a streams application and leave the associated > consumer group, the supplied `leaveGroup` option seems to have no effect. > Sample code: > {code:java} > CloseOptions options = new CloseOptions().leaveGroup(true); > stream.close(options);{code} > The expected behavior here is that the group member would shutdown and leave > the group, immediately triggering a consumer group rebalance. In practice, > the rebalance happens after the appropriate timeout configuration has expired. > I understand the default behavior in that there is an assumption that any > associated StateStores would be persisted to disk and that in the case of a > rolling restart/deployment, the rebalance delay may be preferable. However, > in our application we are using in-memory state stores and standby replicas. > There is no benefit in delaying the rebalance in this setup and we are in > need of a way to force a member to leave the group when shutting down. > The workaround we found is to set an undocumented internal StreamConfig to > enforce this behavior: > {code:java} > props.put("internal.leave.group.on.close", true); > {code} > To state the obvious, this is less than ideal. > Additional configuration details: > {code:java} > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId"); > props.put( > StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092,localhost:9093,localhost:9094"); > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); > props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors); > props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, > StreamsConfig.EXACTLY_ONCE_V2);{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16758) Extend Consumer#close with option to leave the group or not
A. Sophie Blee-Goldman created KAFKA-16758: -- Summary: Extend Consumer#close with option to leave the group or not Key: KAFKA-16758 URL: https://issues.apache.org/jira/browse/KAFKA-16758 Project: Kafka Issue Type: New Feature Components: consumer Reporter: A. Sophie Blee-Goldman See comments on https://issues.apache.org/jira/browse/KAFKA-16514 for the full context. Essentially we would get rid of the "internal.leave.group.on.close" config that is used as a backdoor by Kafka Streams right now to prevent closed consumers from leaving the group, thus reducing unnecessary task movements after a simple bounce. This would be replaced by an actual public API that would allow the caller to opt in or out to the LeaveGroup when close is called. This would be similar to the KafkaStreams#close(CloseOptions) API, and in fact would be how that API will be implemented (since it only works for static groups at the moment as noted in KAFKA-16514 ) This has several benefits over the current situation: # It allows plain consumer apps to opt-out of leaving the group when closed, which is currently not possible through any public API (only an internal backdoor config) # It enables the caller to dynamically select the appropriate action depending on why the client is being closed – for example, you would not want the consumer to leave the group during a simple restart, but would want it to leave the group when shutting down the app or if scaling down the node. This is not possible today, even with the internal config, since configs are immutable # It can be leveraged to "fix" the KafkaStreams#close(closeOptions) API so that the user's choice to leave the group during close will be respected for non-static members -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16361) Rack aware sticky assignor minQuota violations
[ https://issues.apache.org/jira/browse/KAFKA-16361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846075#comment-17846075 ] A. Sophie Blee-Goldman commented on KAFKA-16361: [~luked] it would help narrow down whether it was a regression vs a long-standing issue if you can try to reproduce it with some earlier versions. For example I believe the rack-aware assignment was added to the StickyAssignor in 3.5, so the first step might be to see if it's reproducible in 3.4. If so, then try a much older version to see if it's always been around – 2.3 would be a good place to start, since we first implemented the "constrained" algorithm back in 2.4. > Rack aware sticky assignor minQuota violations > -- > > Key: KAFKA-16361 > URL: https://issues.apache.org/jira/browse/KAFKA-16361 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.5.1, 3.7.0, 3.6.1 >Reporter: Luke D >Priority: Major > Attachments: illegalstateexception.log > > > In some low topic replication scenarios the rack aware assignment in the > StickyAssignor fails to balance consumers to its own expectations and throws > an IllegalStateException, commonly crashing the application (depending on > application implementation). While uncommon the error is deterministic, and > so persists until the replication state changes. > > We have observed this in the wild in 3.5.1, and 3.6.1. We have reproduced it > locally in a test case in 3.6.1 and 3.7.0 (3.5.1 we did not try but likely > would also be reproducible there) > > Here is the error and stack from our test case against 3.7.0 > {code:java} > We haven't reached the expected number of members with more than the minQuota > partitions, but no more partitions to be assigned > java.lang.IllegalStateException: We haven't reached the expected number of > members with more than the minQuota partitions, but no more partitions to be > assigned > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.verifyUnfilledMembers(AbstractStickyAssignor.java:820) > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.build(AbstractStickyAssignor.java:652) > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.assignPartitions(AbstractStickyAssignor.java:113) > at > org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor.assign(AbstractPartitionAssignor.java:91) > {code} > Here is a specific test case from 3.7.0 that fails when passed to > StickyAssignor.assign: > {code:java} > Cluster(id = cluster-id, nodes = [host-3:1 (id: 4 rack: rack-3), host-3:1 > (id: 3 rack: rack-3), host-2:1 (id: 2 rack: rack-2), host-1:1 (id: 1 rack: > rack-1)], partitions = [Partition(topic = topic_name, partition = 57, leader > = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = > topic_name, partition = 90, leader = 2, replicas = [2], isr = [2], > offlineReplicas = []), Partition(topic = topic_name, partition = 28, leader = > 3, replicas = [3], isr = [3], offlineReplicas = []), Partition(topic = > topic_name, partition = 53, leader = 4, replicas = [4], isr = [4], > offlineReplicas = []), Partition(topic = topic_name, partition = 86, leader = > 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = > topic_name, partition = 24, leader = 4, replicas = [4,3,1], isr = [4,3,1], > offlineReplicas = []), Partition(topic = topic_name, partition = 49, leader = > 1, replicas = [1,2], isr = [1,2], offlineReplicas = []), Partition(topic = > topic_name, partition = 82, leader = 4, replicas = [4,2], isr = [4,2], > offlineReplicas = []), Partition(topic = topic_name, partition = 20, leader = > 2, replicas = [2,1], isr = [2,1], offlineReplicas = []), Partition(topic = > topic_name, partition = 45, leader = 2, replicas = [2], isr = [2], > offlineReplicas = []), Partition(topic = topic_name, partition = 78, leader = > 1, replicas = [1], isr = [1], offlineReplicas = []), Partition(topic = > topic_name, partition = 16, leader = 4, replicas = [4], isr = [4], > offlineReplicas = []), Partition(topic = topic_name, partition = 41, leader = > 1, replicas = [1,2], isr = [1,2], offlineReplicas = []), Partition(topic = > topic_name, partition = 74, leader = 4, replicas = [4,3,1], isr = [4,3,1], > offlineReplicas = []), Partition(topic = topic_name, partition = 12, leader = > 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = > topic_name, partition = 37, leader = 1, replicas = [1], isr = [1], > offlineReplicas = []), Partition(topic = topic_name, partition = 70, leader = > 2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = > topic_name, parti
[jira] [Comment Edited] (KAFKA-16644) FK join emits duplicate tombstone on left-side delete
[ https://issues.apache.org/jira/browse/KAFKA-16644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842504#comment-17842504 ] A. Sophie Blee-Goldman edited comment on KAFKA-16644 at 4/30/24 8:57 PM: - [~mjsax] is KAFKA-14778 the correct issue that introduced a regression? That seems to link to an unrelated (and also unresolved) ticket was (Author: ableegoldman): [~mjsax] is KAFKA-14778 the correct issue? It seems to link to an unrelated (and also unresolved) ticket > FK join emits duplicate tombstone on left-side delete > - > > Key: KAFKA-16644 > URL: https://issues.apache.org/jira/browse/KAFKA-16644 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Matthias J. Sax >Priority: Major > > We introduced a regression bug in 3.7.0 release via KAFKA-14778. When a > left-hand side record is deleted, the join now emits two tombstone records > instead of one. > The problem was not detected via unit test, because the tests use a `Map` > instead of a `List` when verifying the result topic records > ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).] > We should update all test cases to use `List` when reading from the output > topic, and of course fix the introduced bug: The > `SubscriptionSendProcessorSupplier` is sending two subscription records > instead of just a single one: > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16644) FK join emits duplicate tombstone on left-side delete
[ https://issues.apache.org/jira/browse/KAFKA-16644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842504#comment-17842504 ] A. Sophie Blee-Goldman commented on KAFKA-16644: [~mjsax] is KAFKA-14778 the correct issue? It seems to link to an unrelated (and also unresolved) ticket > FK join emits duplicate tombstone on left-side delete > - > > Key: KAFKA-16644 > URL: https://issues.apache.org/jira/browse/KAFKA-16644 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Matthias J. Sax >Priority: Major > > We introduced a regression bug in 3.7.0 release via KAFKA-14778. When a > left-hand side record is deleted, the join now emits two tombstone records > instead of one. > The problem was not detected via unit test, because the tests use a `Map` > instead of a `List` when verifying the result topic records > ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).] > We should update all test cases to use `List` when reading from the output > topic, and of course fix the introduced bug: The > `SubscriptionSendProcessorSupplier` is sending two subscription records > instead of just a single one: > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.
[ https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17841374#comment-17841374 ] A. Sophie Blee-Goldman commented on KAFKA-16514: I opened a quick example PR to showcase more clearly what I'm proposing here. It's definitely rather hacky, but as I said already, this functionality of not leaving the group when a consumer is closed was done in a hacky way to begin with (ie via internal consumer config introduced for use by Kafka Streams only). So we may as well fix this issue so the Streams closeOptions have correct semantics The more I think about this the more I feel strongly that it's just silly for Streams users to be unable to opt-out of this "don't leave the group on close" behavior. It's not even possible to use the internal config since Streams strictly overrides it inside StreamsConfig. You can work around this by plugging in your own consumers via KafkaClientSupplier though that does feel a bit extreme. More importantly though, you'd still have to choose up front whether or not to leave the group on close, where you would obviously not know whether it makes sense to leave until you're actually calling close and know _why_ you're calling close (specifically, whether it's a temporary restart/bounce or "permanent" close) > Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup > flag. > --- > > Key: KAFKA-16514 > URL: https://issues.apache.org/jira/browse/KAFKA-16514 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Sal Sorrentino >Priority: Minor > > Working with Kafka Streams 3.7.0, but may affect earlier versions as well. > When attempting to shutdown a streams application and leave the associated > consumer group, the supplied `leaveGroup` option seems to have no effect. > Sample code: > {code:java} > CloseOptions options = new CloseOptions().leaveGroup(true); > stream.close(options);{code} > The expected behavior here is that the group member would shutdown and leave > the group, immediately triggering a consumer group rebalance. In practice, > the rebalance happens after the appropriate timeout configuration has expired. > I understand the default behavior in that there is an assumption that any > associated StateStores would be persisted to disk and that in the case of a > rolling restart/deployment, the rebalance delay may be preferable. However, > in our application we are using in-memory state stores and standby replicas. > There is no benefit in delaying the rebalance in this setup and we are in > need of a way to force a member to leave the group when shutting down. > The workaround we found is to set an undocumented internal StreamConfig to > enforce this behavior: > {code:java} > props.put("internal.leave.group.on.close", true); > {code} > To state the obvious, this is less than ideal. > Additional configuration details: > {code:java} > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId"); > props.put( > StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092,localhost:9093,localhost:9094"); > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); > props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors); > props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, > StreamsConfig.EXACTLY_ONCE_V2);{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.
[ https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17841369#comment-17841369 ] A. Sophie Blee-Goldman commented on KAFKA-16514: The immutable internal config thing is definitely a bummer. To recap: if we want to solve this so that the current Streams API – ie the #close(closeOptions) API – works as intended, ie for non-static members as well, we'd need to change the way the consumer works. Or wait for mutable configs, which would be nice, but realistically that's not happening soon enough. To do this "right" we'd probably need to introduce a new public consumer API of some sort which would mean going through a KIP which could be a bit messy. But as a slightly-hacky alternative, would it be possible to just introduce an internal API that works similar to the effect of the existing internal config, and just have Kafka Streams use that internal API without making it a "real" API and having to do a KIP? I mean that's basically what the internal config is anyways – an internal config not exposed to/intended for use by consumer applications and only introduced for Kafka Streams to use. Doesn't seem that big a deal to just switch from this immutable config to a new internal overload of #close (or even an internal #leaveGroupOnClose API that can be toggled on/off). Thoughts? [~mjsax] [~cadonna] maybe you can raise this with someone who works on the clients to see if there are any concerns/make sure no one would object to this approach? > Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup > flag. > --- > > Key: KAFKA-16514 > URL: https://issues.apache.org/jira/browse/KAFKA-16514 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Sal Sorrentino >Priority: Minor > > Working with Kafka Streams 3.7.0, but may affect earlier versions as well. > When attempting to shutdown a streams application and leave the associated > consumer group, the supplied `leaveGroup` option seems to have no effect. > Sample code: > {code:java} > CloseOptions options = new CloseOptions().leaveGroup(true); > stream.close(options);{code} > The expected behavior here is that the group member would shutdown and leave > the group, immediately triggering a consumer group rebalance. In practice, > the rebalance happens after the appropriate timeout configuration has expired. > I understand the default behavior in that there is an assumption that any > associated StateStores would be persisted to disk and that in the case of a > rolling restart/deployment, the rebalance delay may be preferable. However, > in our application we are using in-memory state stores and standby replicas. > There is no benefit in delaying the rebalance in this setup and we are in > need of a way to force a member to leave the group when shutting down. > The workaround we found is to set an undocumented internal StreamConfig to > enforce this behavior: > {code:java} > props.put("internal.leave.group.on.close", true); > {code} > To state the obvious, this is less than ideal. > Additional configuration details: > {code:java} > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId"); > props.put( > StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092,localhost:9093,localhost:9094"); > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); > props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors); > props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, > StreamsConfig.EXACTLY_ONCE_V2);{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16600) Periodically receive "Failed to transition to PENDING_SHUTDOWN, current state is PENDING_SHUTDOWN" during streams close
[ https://issues.apache.org/jira/browse/KAFKA-16600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839850#comment-17839850 ] A. Sophie Blee-Goldman commented on KAFKA-16600: Thanks [~aleung181] – definitely sounds like a bug. Hard to diagnose without more context though – can you share a bit more of the logs? A minute or so leading up to the error message should hopefully be enough > Periodically receive "Failed to transition to PENDING_SHUTDOWN, current state > is PENDING_SHUTDOWN" during streams close > --- > > Key: KAFKA-16600 > URL: https://issues.apache.org/jira/browse/KAFKA-16600 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.2 >Reporter: Alex Leung >Priority: Minor > > From time to time, we observe the following ERROR message during streams > close: > {code:java} > 2024-04-13 07:40:16,222 INFO o.a.k.s.KafkaStreams [32] stream-client > [testapp-83aa0e09-a6ef-45e4-9393-69dfba4928bf] State transition from RUNNING > to PENDING_SHUTDOWN > 2024-04-13 07:40:16,222 ERROR o.a.k.s.KafkaStreams [55] stream-client > [testapp-83aa0e09-a6ef-45e4-9393-69dfba4928bf] Failed to transition to > PENDING_SHUTDOWN, current state is PENDING_SHUTDOWN > {code} > These ERRORs started showing up when we moved from 2.7.2 to 3.3.2 and we have > not changed any code related to streams shutdown. > When the problem does not occur (most of the time), it looks like the > following: > {code:java} > 2024-04-13 07:40:11,333 INFO o.a.k.s.KafkaStreams [55] stream-client > [testapp-e2212b31-e9c2-4c75-92f6-1e557e27bf66] State transition from RUNNING > to PENDING_SHUTDOWN > 2024-04-13 07:40:11,341 INFO o.a.k.s.KafkaStreams [32] stream-client > [testapp-e2212b31-e9c2-4c75-92f6-1e557e27bf66] Streams client is in > PENDING_SHUTDOWN, all resources are being closed and the client will be > stopped. {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16566) Update static membership fencing system test to support new protocol
[ https://issues.apache.org/jira/browse/KAFKA-16566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17837820#comment-17837820 ] A. Sophie Blee-Goldman commented on KAFKA-16566: I'm a bit confused – I thought we haven't migrated Streams over to the new consumer rebalancing protocol. Or is this referring to something else? What is the "classic" vs "consumer" protocol? And when/why did we migrate our system tests to using it? Does it have to do with static membership specifically? Sorry for being out of the loop here > Update static membership fencing system test to support new protocol > > > Key: KAFKA-16566 > URL: https://issues.apache.org/jira/browse/KAFKA-16566 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer > that verifies the sequence in which static members join a group when using > conflicting instance id. This behaviour is different in the classic and > consumer protocol, so the tests should be updated to set the right > expectations when running with the new consumer protocol. Note that what the > tests covers (params, setup), apply to both protocols. It is the expected > results that are not the same. > When conflicts between static members joining a group: > Classic protocol: all members join the group with the same group instance id, > and then the first one will eventually receive a HB error with > FencedInstanceIdException > Consumer protocol: new member with an instance Id already in use is not able > to join, receiving an UnreleasedInstanceIdException in the response to the HB > to join the group. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.
[ https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836736#comment-17836736 ] A. Sophie Blee-Goldman commented on KAFKA-16514: I haven't gone back and re-read the KIP, but IIRC the reason for adding these CloseOptions was specific to solving an issue with static membership, hence why it only takes affect there. That said – I completely agree that there's no reason why this should only work with static membership, and the decision to not leave the group for non-static-membership is one of those biases that Kafka Streams has in assuming persistent state stores. I would fully support changing the behavior to work with non-static-members rather than just updating the javadocs to explain this. [~mjsax] would this need a KIP? Or can we just consider this a "bug" (especially since the javadocs make no mention that it's intended to only work on static members) and since we don't need any API changes, simply make the change without a KIP? Either way – [~sal.sorrentino] would you be interested in picking this up? > Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup > flag. > --- > > Key: KAFKA-16514 > URL: https://issues.apache.org/jira/browse/KAFKA-16514 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Sal Sorrentino >Priority: Minor > > Working with Kafka Streams 3.7.0, but may affect earlier versions as well. > When attempting to shutdown a streams application and leave the associated > consumer group, the supplied `leaveGroup` option seems to have no effect. > Sample code: > {code:java} > CloseOptions options = new CloseOptions().leaveGroup(true); > stream.close(options);{code} > The expected behavior here is that the group member would shutdown and leave > the group, immediately triggering a consumer group rebalance. In practice, > the rebalance happens after the appropriate timeout configuration has expired. > I understand the default behavior in that there is an assumption that any > associated StateStores would be persisted to disk and that in the case of a > rolling restart/deployment, the rebalance delay may be preferable. However, > in our application we are using in-memory state stores and standby replicas. > There is no benefit in delaying the rebalance in this setup and we are in > need of a way to force a member to leave the group when shutting down. > The workaround we found is to set an undocumented internal StreamConfig to > enforce this behavior: > {code:java} > props.put("internal.leave.group.on.close", true); > {code} > To state the obvious, this is less than ideal. > Additional configuration details: > {code:java} > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId"); > props.put( > StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092,localhost:9093,localhost:9094"); > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); > props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors); > props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, > StreamsConfig.EXACTLY_ONCE_V2);{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-12506) Expand AdjustStreamThreadCountTest
[ https://issues.apache.org/jira/browse/KAFKA-12506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17833758#comment-17833758 ] A. Sophie Blee-Goldman commented on KAFKA-12506: [~goyarpit] yes, it seems this ticket has been abandoned. Feel free to pick it up (and let me know if you have any questions) [~kebab-mai-haddi] if you still want to work on this, just let me know, I'm sure there are multiple improvements that could be made here in parallel > Expand AdjustStreamThreadCountTest > -- > > Key: KAFKA-12506 > URL: https://issues.apache.org/jira/browse/KAFKA-12506 > Project: Kafka > Issue Type: Sub-task > Components: streams, unit tests >Reporter: A. Sophie Blee-Goldman >Assignee: Arpit Goyal >Priority: Major > Labels: newbie, newbie++ > > Right now the AdjustStreamThreadCountTest runs a minimal topology that just > consumes a single input topic, and doesn't produce any data to this topic. > Some of the complex concurrency bugs that we've found only showed up when we > had some actual data to process and a stateful topology: > [KAFKA-12503|https://issues.apache.org/jira/browse/KAFKA-12503] KAFKA-12500 > See the umbrella ticket for the list of improvements we need to make this a > more effective integration test -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-12506) Expand AdjustStreamThreadCountTest
[ https://issues.apache.org/jira/browse/KAFKA-12506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman reassigned KAFKA-12506: -- Assignee: Arpit Goyal (was: Aviral Srivastava) > Expand AdjustStreamThreadCountTest > -- > > Key: KAFKA-12506 > URL: https://issues.apache.org/jira/browse/KAFKA-12506 > Project: Kafka > Issue Type: Sub-task > Components: streams, unit tests >Reporter: A. Sophie Blee-Goldman >Assignee: Arpit Goyal >Priority: Major > Labels: newbie, newbie++ > > Right now the AdjustStreamThreadCountTest runs a minimal topology that just > consumes a single input topic, and doesn't produce any data to this topic. > Some of the complex concurrency bugs that we've found only showed up when we > had some actual data to process and a stateful topology: > [KAFKA-12503|https://issues.apache.org/jira/browse/KAFKA-12503] KAFKA-12500 > See the umbrella ticket for the list of improvements we need to make this a > more effective integration test -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16458) Add contains method in KeyValue store interface
[ https://issues.apache.org/jira/browse/KAFKA-16458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17833749#comment-17833749 ] A. Sophie Blee-Goldman commented on KAFKA-16458: This could be nice to have, but unfortunately given the overly complex way that state stores are implemented and the multi-layer hierarchy, something that should be simple – ie just adding a basic API to the StateStore interface – actually ends up being a massive amount of work. We do plan to overhaul the state store hierarchy at some point in order to simplify the codebase, both for maintenance and new features, although there's no clear roadmap or promise for this to happen anytime soon. That said, I would personally suggest we hold off on adding any new APIs that don't add strictly-new functionality until after we've simplified the state store implementation. Of course, if this is something you really want, you're always free to kick off a KIP discussion whenever. Just wanted to provide some context and warn that this would not be as straightforward as it might seem to actually implement. To your final question: I do think in some sense the reality is that yes, this API is not offered on purpose, in order to keep the interface as simple as possible. But this in itself would be less of a concern if the state store hierarchy was not such a hassle to expand and maintain, which is why I think the community would be open to it after we can get around to cleaning up the store implementation. > Add contains method in KeyValue store interface > --- > > Key: KAFKA-16458 > URL: https://issues.apache.org/jira/browse/KAFKA-16458 > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: Ayoub Omari >Priority: Minor > Labels: needs-kip > > In some stream processors, we sometimes just want to check if a key exists in > the state store or not. > > I find calling .get() and checking if the return value is null a little bit > verbose > {code:java} > if (store.get(key) != null) { > }{code} > > But I am not sure if it is on purpose that we would like to keep the store > interface simple. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16277) CooperativeStickyAssignor does not spread topics evenly among consumer group
[ https://issues.apache.org/jira/browse/KAFKA-16277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17820880#comment-17820880 ] A. Sophie Blee-Goldman commented on KAFKA-16277: FYI I added you as a contributor so you should be able to self-assign tickets from now on. Thanks again for contributing a fix! > CooperativeStickyAssignor does not spread topics evenly among consumer group > > > Key: KAFKA-16277 > URL: https://issues.apache.org/jira/browse/KAFKA-16277 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Cameron Redpath >Assignee: Cameron Redpath >Priority: Major > Fix For: 3.8.0 > > Attachments: image-2024-02-19-13-00-28-306.png > > > Consider the following scenario: > `topic-1`: 12 partitions > `topic-2`: 12 partitions > > Of note, `topic-1` gets approximately 10 times more messages through it than > `topic-2`. > > Both of these topics are consumed by a single application, single consumer > group, which scales under load. Each member of the consumer group subscribes > to both topics. The `partition.assignment.strategy` being used is > `org.apache.kafka.clients.consumer.CooperativeStickyAssignor`. The > application may start with one consumer. It consumes all partitions from both > topics. > > The problem begins when the application scales up to two consumers. What is > seen is that all partitions from `topic-1` go to one consumer, and all > partitions from `topic-2` go to the other consumer. In the case with one > topic receiving more messages than the other, this results in a very > imbalanced group where one consumer is receiving 10x the traffic of the other > due to partition assignment. > > This is the issue being seen in our cluster at the moment. See this graph of > the number of messages being processed by each consumer as the group scales > from one to four consumers: > !image-2024-02-19-13-00-28-306.png|width=537,height=612! > Things to note from this graphic: > * With two consumers, the partitions for a topic all go to a single consumer > each > * With three consumers, the partitions for a topic are split between two > consumers each > * With four consumers, the partitions for a topic are split between three > consumers each > * The total number of messages being processed by each consumer in the group > is very imbalanced throughout the entire period > > With regard to the number of _partitions_ being assigned to each consumer, > the group is balanced. However, the assignment appears to be biased so that > partitions from the same topic go to the same consumer. In our scenario, this > leads to very undesirable partition assignment. > > I question if the behaviour of the assignor should be revised, so that each > topic has its partitions maximally spread across all available members of the > consumer group. In the above scenario, this would result in much more even > distribution of load. The behaviour would then be: > * With two consumers, 6 partitions from each topic go to each consumer > * With three consumers, 4 partitions from each topic go to each consumer > * With four consumers, 3 partitions from each topic go to each consumer > > Of note, we only saw this behaviour after migrating to the > `CooperativeStickyAssignor`. It was not an issue with the default partition > assignment strategy. > > It is possible this may be intended behaviour. In which case, what is the > preferred workaround for our scenario? Our current workaround if we decide to > go ahead with the update to `CooperativeStickyAssignor` may be to limit our > consumers so they only subscribe to one topic, and have two consumer threads > per instance of the application. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16277) CooperativeStickyAssignor does not spread topics evenly among consumer group
[ https://issues.apache.org/jira/browse/KAFKA-16277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-16277. Resolution: Fixed > CooperativeStickyAssignor does not spread topics evenly among consumer group > > > Key: KAFKA-16277 > URL: https://issues.apache.org/jira/browse/KAFKA-16277 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Cameron Redpath >Priority: Major > Fix For: 3.8.0 > > Attachments: image-2024-02-19-13-00-28-306.png > > > Consider the following scenario: > `topic-1`: 12 partitions > `topic-2`: 12 partitions > > Of note, `topic-1` gets approximately 10 times more messages through it than > `topic-2`. > > Both of these topics are consumed by a single application, single consumer > group, which scales under load. Each member of the consumer group subscribes > to both topics. The `partition.assignment.strategy` being used is > `org.apache.kafka.clients.consumer.CooperativeStickyAssignor`. The > application may start with one consumer. It consumes all partitions from both > topics. > > The problem begins when the application scales up to two consumers. What is > seen is that all partitions from `topic-1` go to one consumer, and all > partitions from `topic-2` go to the other consumer. In the case with one > topic receiving more messages than the other, this results in a very > imbalanced group where one consumer is receiving 10x the traffic of the other > due to partition assignment. > > This is the issue being seen in our cluster at the moment. See this graph of > the number of messages being processed by each consumer as the group scales > from one to four consumers: > !image-2024-02-19-13-00-28-306.png|width=537,height=612! > Things to note from this graphic: > * With two consumers, the partitions for a topic all go to a single consumer > each > * With three consumers, the partitions for a topic are split between two > consumers each > * With four consumers, the partitions for a topic are split between three > consumers each > * The total number of messages being processed by each consumer in the group > is very imbalanced throughout the entire period > > With regard to the number of _partitions_ being assigned to each consumer, > the group is balanced. However, the assignment appears to be biased so that > partitions from the same topic go to the same consumer. In our scenario, this > leads to very undesirable partition assignment. > > I question if the behaviour of the assignor should be revised, so that each > topic has its partitions maximally spread across all available members of the > consumer group. In the above scenario, this would result in much more even > distribution of load. The behaviour would then be: > * With two consumers, 6 partitions from each topic go to each consumer > * With three consumers, 4 partitions from each topic go to each consumer > * With four consumers, 3 partitions from each topic go to each consumer > > Of note, we only saw this behaviour after migrating to the > `CooperativeStickyAssignor`. It was not an issue with the default partition > assignment strategy. > > It is possible this may be intended behaviour. In which case, what is the > preferred workaround for our scenario? Our current workaround if we decide to > go ahead with the update to `CooperativeStickyAssignor` may be to limit our > consumers so they only subscribe to one topic, and have two consumer threads > per instance of the application. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16277) CooperativeStickyAssignor does not spread topics evenly among consumer group
[ https://issues.apache.org/jira/browse/KAFKA-16277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-16277: --- Fix Version/s: 3.8.0 > CooperativeStickyAssignor does not spread topics evenly among consumer group > > > Key: KAFKA-16277 > URL: https://issues.apache.org/jira/browse/KAFKA-16277 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Cameron Redpath >Priority: Major > Fix For: 3.8.0 > > Attachments: image-2024-02-19-13-00-28-306.png > > > Consider the following scenario: > `topic-1`: 12 partitions > `topic-2`: 12 partitions > > Of note, `topic-1` gets approximately 10 times more messages through it than > `topic-2`. > > Both of these topics are consumed by a single application, single consumer > group, which scales under load. Each member of the consumer group subscribes > to both topics. The `partition.assignment.strategy` being used is > `org.apache.kafka.clients.consumer.CooperativeStickyAssignor`. The > application may start with one consumer. It consumes all partitions from both > topics. > > The problem begins when the application scales up to two consumers. What is > seen is that all partitions from `topic-1` go to one consumer, and all > partitions from `topic-2` go to the other consumer. In the case with one > topic receiving more messages than the other, this results in a very > imbalanced group where one consumer is receiving 10x the traffic of the other > due to partition assignment. > > This is the issue being seen in our cluster at the moment. See this graph of > the number of messages being processed by each consumer as the group scales > from one to four consumers: > !image-2024-02-19-13-00-28-306.png|width=537,height=612! > Things to note from this graphic: > * With two consumers, the partitions for a topic all go to a single consumer > each > * With three consumers, the partitions for a topic are split between two > consumers each > * With four consumers, the partitions for a topic are split between three > consumers each > * The total number of messages being processed by each consumer in the group > is very imbalanced throughout the entire period > > With regard to the number of _partitions_ being assigned to each consumer, > the group is balanced. However, the assignment appears to be biased so that > partitions from the same topic go to the same consumer. In our scenario, this > leads to very undesirable partition assignment. > > I question if the behaviour of the assignor should be revised, so that each > topic has its partitions maximally spread across all available members of the > consumer group. In the above scenario, this would result in much more even > distribution of load. The behaviour would then be: > * With two consumers, 6 partitions from each topic go to each consumer > * With three consumers, 4 partitions from each topic go to each consumer > * With four consumers, 3 partitions from each topic go to each consumer > > Of note, we only saw this behaviour after migrating to the > `CooperativeStickyAssignor`. It was not an issue with the default partition > assignment strategy. > > It is possible this may be intended behaviour. In which case, what is the > preferred workaround for our scenario? Our current workaround if we decide to > go ahead with the update to `CooperativeStickyAssignor` may be to limit our > consumers so they only subscribe to one topic, and have two consumer threads > per instance of the application. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16277) CooperativeStickyAssignor does not spread topics evenly among consumer group
[ https://issues.apache.org/jira/browse/KAFKA-16277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman reassigned KAFKA-16277: -- Assignee: Cameron Redpath > CooperativeStickyAssignor does not spread topics evenly among consumer group > > > Key: KAFKA-16277 > URL: https://issues.apache.org/jira/browse/KAFKA-16277 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Cameron Redpath >Assignee: Cameron Redpath >Priority: Major > Fix For: 3.8.0 > > Attachments: image-2024-02-19-13-00-28-306.png > > > Consider the following scenario: > `topic-1`: 12 partitions > `topic-2`: 12 partitions > > Of note, `topic-1` gets approximately 10 times more messages through it than > `topic-2`. > > Both of these topics are consumed by a single application, single consumer > group, which scales under load. Each member of the consumer group subscribes > to both topics. The `partition.assignment.strategy` being used is > `org.apache.kafka.clients.consumer.CooperativeStickyAssignor`. The > application may start with one consumer. It consumes all partitions from both > topics. > > The problem begins when the application scales up to two consumers. What is > seen is that all partitions from `topic-1` go to one consumer, and all > partitions from `topic-2` go to the other consumer. In the case with one > topic receiving more messages than the other, this results in a very > imbalanced group where one consumer is receiving 10x the traffic of the other > due to partition assignment. > > This is the issue being seen in our cluster at the moment. See this graph of > the number of messages being processed by each consumer as the group scales > from one to four consumers: > !image-2024-02-19-13-00-28-306.png|width=537,height=612! > Things to note from this graphic: > * With two consumers, the partitions for a topic all go to a single consumer > each > * With three consumers, the partitions for a topic are split between two > consumers each > * With four consumers, the partitions for a topic are split between three > consumers each > * The total number of messages being processed by each consumer in the group > is very imbalanced throughout the entire period > > With regard to the number of _partitions_ being assigned to each consumer, > the group is balanced. However, the assignment appears to be biased so that > partitions from the same topic go to the same consumer. In our scenario, this > leads to very undesirable partition assignment. > > I question if the behaviour of the assignor should be revised, so that each > topic has its partitions maximally spread across all available members of the > consumer group. In the above scenario, this would result in much more even > distribution of load. The behaviour would then be: > * With two consumers, 6 partitions from each topic go to each consumer > * With three consumers, 4 partitions from each topic go to each consumer > * With four consumers, 3 partitions from each topic go to each consumer > > Of note, we only saw this behaviour after migrating to the > `CooperativeStickyAssignor`. It was not an issue with the default partition > assignment strategy. > > It is possible this may be intended behaviour. In which case, what is the > preferred workaround for our scenario? Our current workaround if we decide to > go ahead with the update to `CooperativeStickyAssignor` may be to limit our > consumers so they only subscribe to one topic, and have two consumer threads > per instance of the application. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16295) Align RocksDB and in-memory store init() sequence
[ https://issues.apache.org/jira/browse/KAFKA-16295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-16295: --- Labels: newbie (was: ) > Align RocksDB and in-memory store init() sequence > - > > Key: KAFKA-16295 > URL: https://issues.apache.org/jira/browse/KAFKA-16295 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Minor > Labels: newbie > > Cf [https://lists.apache.org/thread/f4z1vmpb21xhyxl6966xtcb3958fyx5d] > {quote}For RocksDB stores, we open the store first and then call #register, > [...] However the in-memory store actually registers itself *first*, before > marking itself as open,[..]. > I suppose it would make sense to align the two store implementations to have > the same behavior, and the in-memory store is probably technically more > correct. > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16295) Align RocksDB and in-memory store init() sequence
[ https://issues.apache.org/jira/browse/KAFKA-16295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-16295: --- Summary: Align RocksDB and in-memory store init() sequence (was: Align RocksDB and in-memory store inti() sequence) > Align RocksDB and in-memory store init() sequence > - > > Key: KAFKA-16295 > URL: https://issues.apache.org/jira/browse/KAFKA-16295 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Minor > > Cf [https://lists.apache.org/thread/f4z1vmpb21xhyxl6966xtcb3958fyx5d] > {quote}For RocksDB stores, we open the store first and then call #register, > [...] However the in-memory store actually registers itself *first*, before > marking itself as open,[..]. > I suppose it would make sense to align the two store implementations to have > the same behavior, and the in-memory store is probably technically more > correct. > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14597) [Streams] record-e2e-latency-max is not reporting correct metrics
[ https://issues.apache.org/jira/browse/KAFKA-14597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818599#comment-17818599 ] A. Sophie Blee-Goldman commented on KAFKA-14597: Hey [~atuljainiitk] I just came across this ticket for the first time, but for the most part I think your analysis makes sense. Are you still interested in picking this up? Specifically, reverting the linked change and getting the true system time for terminal nodes? It's been a while since that change was made/the KIP was implemented, so I don't remember all the context, but fetching the current time at terminal nodes sounds reasonable to me. Clearly the metric is otherwise useless, so we should either update it to be correct (and monitor for any potential performance impact) or just remove it entirely. And fixing it is obviously preferable, at least unless we know for a sure thing that it does hurt performance. I'm optimistic though cc also [~talestonini] [~cadonna] > [Streams] record-e2e-latency-max is not reporting correct metrics > -- > > Key: KAFKA-14597 > URL: https://issues.apache.org/jira/browse/KAFKA-14597 > Project: Kafka > Issue Type: Bug > Components: metrics, streams >Reporter: Atul Jain >Assignee: Tales Tonini >Priority: Major > Attachments: image-2023-03-21-15-07-24-352.png, > image-2023-03-21-19-01-54-713.png, image-2023-03-21-19-03-07-525.png, > image-2023-03-21-19-03-28-625.png, process-latency-max.jpg, > record-e2e-latency-max.jpg > > > I was following this KIP documentation > ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams]) > and kafka streams documentation > ([https://kafka.apache.org/documentation/#kafka_streams_monitoring:~:text=node%2Did%3D(%5B%2D.%5Cw%5D%2B)-,record%2De2e%2Dlatency%2Dmax,-The%20maximum%20end]) > . Based on these documentations , the *record-e2e-latency-max* should > monitor the full end to end latencies, which includes both *consumption > latencies* and {*}processing delays{*}. > However, based on my observations , record-e2e-latency-max seems to be only > measuring the consumption latencies. processing delays can be measured using > *process-latency-max* .I am checking all this using a simple topology > consisting of source, processors and sink (code added). I have added some > sleep time (of 3 seconds) in one of the processors to ensure some delays in > the processing logic. These delays are not getting accounted in the > record-e2e-latency-max but are accounted in process-latency-max. > process-latency-max was observed to be 3002 ms which accounts for sleep time > of 3 seconds. However, record-e2e-latency-max observed in jconsole is 422ms, > which does not account for 3 seconds of sleep time. > > Code describing my topology: > {code:java} >static Topology buildTopology(String inputTopic, String outputTopic) { > log.info("Input topic: " + inputTopic + " and output topic: " + > outputTopic); > Serde stringSerde = Serdes.String(); > StreamsBuilder builder = new StreamsBuilder(); > builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde)) > .peek((k,v) -> log.info("Observed event: key" + k + " value: > " + v)) > .mapValues(s -> { > try { > System.out.println("sleeping for 3 seconds"); > Thread.sleep(3000); > } > catch (InterruptedException e) { > e.printStackTrace(); > } > return s.toUpperCase(); > }) > .peek((k,v) -> log.info("Transformed event: key" + k + " > value: " + v)) > .to(outputTopic, Produced.with(stringSerde, stringSerde)); > return builder.build(); > } {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16277) CooperativeStickyAssignor does not spread topics evenly among consumer group
[ https://issues.apache.org/jira/browse/KAFKA-16277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818594#comment-17818594 ] A. Sophie Blee-Goldman commented on KAFKA-16277: Hey [~credpath-seek] – first up, while it's been a while since I've looked at the sticky assignor code, I'm not too surprised that this might be the case. The obvious emphasis (per the name) was put on "stickiness" and partition-number balance, with good data parallelism ie topic-level balance being best-effort at most. That said, I suspect the assignor could be making a better effort. Presumably what is happening is that during the phase where it attempts to re-assign previously-owned partitions back to their former owner, we make a pass over a sorted list of previously-owned partitions that is grouped by topic. The assignor will then assign partitions from this list one-by-one to its previous owner until it hits the expected total number of partitions. So in the scenario you describe, it's basically looping over (t1p0, t1p1, t1p2, t1p3...t1pN, t2p0, t2p1, t2p2...t2pN) and assigning the first N partitions to the first consumer, which would be everything from topic 1, then just dumping the remaining partitions – all of which belong to topic 2 – onto the new consumer. The fix should be fairly simple – we just need to group this sorted list by partition, rather than by topic (ie t1p0, t2p0, t1p1, t2p1...t1pN, t2pN). Would you be interested in submitting a patch for this? As for what you can do right now: technically even if a fix for this was merged, you'd have to wait for the next release. However, the assignment is technically completely customizable, so in theory you could just copy/paste all the code from the patched assignor into a custom ConsumerPartitionAssignor implementation and then plug that in instead of the "cooperative-sticky" assignment strategy. Otherwise, the workaround you suggest is a reasonable backup – with the obvious downside being that the two threads will have an unbalanced load between them, at least the overall node-level workload will be more even > CooperativeStickyAssignor does not spread topics evenly among consumer group > > > Key: KAFKA-16277 > URL: https://issues.apache.org/jira/browse/KAFKA-16277 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Cameron Redpath >Priority: Major > Attachments: image-2024-02-19-13-00-28-306.png > > > Consider the following scenario: > `topic-1`: 12 partitions > `topic-2`: 12 partitions > > Of note, `topic-1` gets approximately 10 times more messages through it than > `topic-2`. > > Both of these topics are consumed by a single application, single consumer > group, which scales under load. Each member of the consumer group subscribes > to both topics. The `partition.assignment.strategy` being used is > `org.apache.kafka.clients.consumer.CooperativeStickyAssignor`. The > application may start with one consumer. It consumes all partitions from both > topics. > > The problem begins when the application scales up to two consumers. What is > seen is that all partitions from `topic-1` go to one consumer, and all > partitions from `topic-2` go to the other consumer. In the case with one > topic receiving more messages than the other, this results in a very > imbalanced group where one consumer is receiving 10x the traffic of the other > due to partition assignment. > > This is the issue being seen in our cluster at the moment. See this graph of > the number of messages being processed by each consumer as the group scales > from one to four consumers: > !image-2024-02-19-13-00-28-306.png|width=537,height=612! > Things to note from this graphic: > * With two consumers, the partitions for a topic all go to a single consumer > each > * With three consumers, the partitions for a topic are split between two > consumers each > * With four consumers, the partitions for a topic are split between three > consumers each > * The total number of messages being processed by each consumer in the group > is very imbalanced throughout the entire period > > With regard to the number of _partitions_ being assigned to each consumer, > the group is balanced. However, the assignment appears to be biased so that > partitions from the same topic go to the same consumer. In our scenario, this > leads to very undesirable partition assignment. > > I question if the behaviour of the assignor should be revised, so that each > topic has its partitions maximally spread across all available members of the > consumer group. In the above scenario, this would result in much more even > distribution of load. The behaviour would then be: > * With two
[jira] [Commented] (KAFKA-16241) Kafka Streams hits IllegalStateException trying to recycle a task
[ https://issues.apache.org/jira/browse/KAFKA-16241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17816234#comment-17816234 ] A. Sophie Blee-Goldman commented on KAFKA-16241: Off-topic but I opened the logs for a second and there is a huge amount of spam. I know everyone is probably aware of the "Detected out-of-order KTable update" logspam by now, but there was over 100MB from that one log message alone (for a single node)...yikes Something that's new, or new to me at least, is this: {code:java} [2024-02-09 03:23:55,168] WARN [i-0fede2697f39580f9-StreamThread-1] stream-thread [i-0fede2697f39580f9-StreamThread-1] Detected that shutdown was requested. All clients in this app will now begin to shutdown (org.apache.kafka.streams.processor.internals.StreamThread) [2024-02-09 03:23:55,168] INFO [i-0fede2697f39580f9-StreamThread-1] [Consumer instanceId=ip-172-31-14-207-1, clientId=i-0fede2697f39580f9-StreamThread-1-consumer, groupId=stream-soak-test] Request joining group due to: Shutdown requested (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) {code} Those two lines are logged repeatedly (and excessively) in a very tight loop at the end of the logs. Filtering those two lines out dropped the file size by another 200MB...that's a lot!! And it's all from about 10 seconds of real time. So it's a VERY busy loop... > Kafka Streams hits IllegalStateException trying to recycle a task > - > > Key: KAFKA-16241 > URL: https://issues.apache.org/jira/browse/KAFKA-16241 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.1 >Reporter: Matthias J. Sax >Priority: Major > Attachments: streams-1.zip, streams-2.zip, streams-3.zip > > > Running with EOS-v2 (not sure if relevant or not) and hitting: > {code:java} > [2024-02-08 20:57:42,325] ERROR [i-0fede2697f39580f9-StreamThread-1] > stream-thread [i-0fede2697f39580f9-StreamThread-1] Failed to recycle task 1_0 > cleanly. Attempting to close remaining tasks before re-throwing: > (org.apache.kafka.streams.processor.internals.TaskManager) > java.lang.IllegalStateException: Illegal state RESTORING while recycling > active task 1_0 > at > org.apache.kafka.streams.processor.internals.StreamTask.prepareRecycle(StreamTask.java:582) > at > org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createStandbyTaskFromActive(StandbyTaskCreator.java:125) > at > org.apache.kafka.streams.processor.internals.TaskManager.convertActiveToStandby(TaskManager.java:675) > at > org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:651) > at > org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:564) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1220) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1014) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:954) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:766) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579) > {code} > Logs of all three KS instances attached. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16055) Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders
[ https://issues.apache.org/jira/browse/KAFKA-16055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman reassigned KAFKA-16055: -- Assignee: Kohei Nozaki (was: Kohei Nozaki) > Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders > > > Key: KAFKA-16055 > URL: https://issues.apache.org/jira/browse/KAFKA-16055 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.1 >Reporter: Kohei Nozaki >Assignee: Kohei Nozaki >Priority: Minor > Labels: newbie, newbie++ > Fix For: 3.8.0 > > > This was originally raised in [a kafka-users > post|https://lists.apache.org/thread/gpct1275bfqovlckptn3lvf683qpoxol]. > There is a HashMap stored in QueryableStoreProvider#storeProviders ([code > link|https://github.com/apache/kafka/blob/3.6.1/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java#L39]) > which can be mutated by a KafkaStreams#removeStreamThread() call. This can > be problematic when KafkaStreams#store is called from a separate thread. > We need to somehow make this part of code thread-safe by replacing it by > ConcurrentHashMap or/and using an existing locking mechanism. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16055) Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders
[ https://issues.apache.org/jira/browse/KAFKA-16055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman reassigned KAFKA-16055: -- Assignee: Kohei Nozaki > Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders > > > Key: KAFKA-16055 > URL: https://issues.apache.org/jira/browse/KAFKA-16055 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.1 >Reporter: Kohei Nozaki >Assignee: Kohei Nozaki >Priority: Minor > Labels: newbie, newbie++ > Fix For: 3.8.0 > > > This was originally raised in [a kafka-users > post|https://lists.apache.org/thread/gpct1275bfqovlckptn3lvf683qpoxol]. > There is a HashMap stored in QueryableStoreProvider#storeProviders ([code > link|https://github.com/apache/kafka/blob/3.6.1/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java#L39]) > which can be mutated by a KafkaStreams#removeStreamThread() call. This can > be problematic when KafkaStreams#store is called from a separate thread. > We need to somehow make this part of code thread-safe by replacing it by > ConcurrentHashMap or/and using an existing locking mechanism. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16055) Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders
[ https://issues.apache.org/jira/browse/KAFKA-16055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-16055: --- Fix Version/s: 3.8.0 > Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders > > > Key: KAFKA-16055 > URL: https://issues.apache.org/jira/browse/KAFKA-16055 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.1 >Reporter: Kohei Nozaki >Priority: Minor > Labels: newbie, newbie++ > Fix For: 3.8.0 > > > This was originally raised in [a kafka-users > post|https://lists.apache.org/thread/gpct1275bfqovlckptn3lvf683qpoxol]. > There is a HashMap stored in QueryableStoreProvider#storeProviders ([code > link|https://github.com/apache/kafka/blob/3.6.1/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java#L39]) > which can be mutated by a KafkaStreams#removeStreamThread() call. This can > be problematic when KafkaStreams#store is called from a separate thread. > We need to somehow make this part of code thread-safe by replacing it by > ConcurrentHashMap or/and using an existing locking mechanism. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14404) Fix & update docs on client configs controlled by Streams
[ https://issues.apache.org/jira/browse/KAFKA-14404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-14404: --- Fix Version/s: 3.8.0 > Fix & update docs on client configs controlled by Streams > - > > Key: KAFKA-14404 > URL: https://issues.apache.org/jira/browse/KAFKA-14404 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Ayoub Omari >Priority: Major > Labels: docs, newbie > Fix For: 3.8.0 > > > There are a handful of client configs that can't be set by Streams users for > various reasons, such as the group id, but we seem to have missed a few of > them in the documentation > [here|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26]: > the partitioner assignor (Consumer) and partitioner (Producer). > This section of the docs also just needs to be cleaned up in general as there > is overlap between the [Default > Values|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#default-values] > and [Parameters controlled by Kafka > Streams|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26] > sections, and the table of contents is messed up presumably due to an issue > with the section headers. > We should separate these with one section covering (only) configs where > Streams sets a different default but this can still be overridden by the > user, and the other section covering the configs that Streams hardcodes and > users can never override. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16089) Kafka Streams still leaking memory
[ https://issues.apache.org/jira/browse/KAFKA-16089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-16089: --- Fix Version/s: 3.8.0 > Kafka Streams still leaking memory > -- > > Key: KAFKA-16089 > URL: https://issues.apache.org/jira/browse/KAFKA-16089 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.8.0 >Reporter: Lucas Brutschy >Assignee: Nicholas Telford >Priority: Critical > Fix For: 3.8.0 > > Attachments: fix.png, graphviz (1).svg, unfix.png > > > In > [https://github.com/apache/kafka/commit/58d6d2e5922df60cdc5c5c1bcd1c2d82dd2b71e2] > a leak was fixed in the release candidate for 3.7. > > However, Kafka Streams still seems to be leaking memory (just slower) after > the fix. > > Attached is the `jeprof` output right before a crash after ~11 hours. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16089) Kafka Streams still leaking memory
[ https://issues.apache.org/jira/browse/KAFKA-16089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805801#comment-17805801 ] A. Sophie Blee-Goldman commented on KAFKA-16089: Yeah, nice investigation! That was a tricky one > Kafka Streams still leaking memory > -- > > Key: KAFKA-16089 > URL: https://issues.apache.org/jira/browse/KAFKA-16089 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.8.0 >Reporter: Lucas Brutschy >Assignee: Nicholas Telford >Priority: Critical > Attachments: fix.png, graphviz (1).svg, unfix.png > > > In > [https://github.com/apache/kafka/commit/58d6d2e5922df60cdc5c5c1bcd1c2d82dd2b71e2] > a leak was fixed in the release candidate for 3.7. > > However, Kafka Streams still seems to be leaking memory (just slower) after > the fix. > > Attached is the `jeprof` output right before a crash after ~11 hours. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16025) Streams StateDirectory has orphaned locks after rebalancing, blocking future rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-16025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17804498#comment-17804498 ] A. Sophie Blee-Goldman commented on KAFKA-16025: FYI [~sabitn] I added you as a Jira contributor so you should now be able to self-assign any tickets you are working on. Thanks again for the fix! > Streams StateDirectory has orphaned locks after rebalancing, blocking future > rebalancing > > > Key: KAFKA-16025 > URL: https://issues.apache.org/jira/browse/KAFKA-16025 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.4.0 > Environment: Linux >Reporter: Sabit >Assignee: Sabit >Priority: Major > Fix For: 3.8.0 > > > Hello, > > We are encountering an issue where during rebalancing, we see streams threads > on one client get stuck in rebalancing. Upon enabling debug logs, we saw that > some tasks were having issues initializing due to failure to grab a lock in > the StateDirectory: > > {{2023-12-14 22:51:57.352000Z stream-thread > [i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since: > stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51] > Failed to lock the state directory for task 0_51; will retry}} > > We were able to reproduce this behavior reliably on 3.4.0. This is the > sequence that triggers the bug. > Assume in a streams consumer group, there are 5 instances (A, B, C, D, E), > each with 5 threads (1-5), and the consumer is using stateful tasks which > have state stores on disk. There are 10 active tasks and 10 standby tasks. > # Instance A is deactivated > # As an example, lets say task 0_1, previously on instance B, moves to > instance C > # Task 0_1 leaves behind it's state directory on Instance B's disk, > currently unused, and no lock for it exists in Instance B's StateDirectory > in-memory lock tracker > # Instance A is re-activated > # Streams thread 1 on Instance B is asked to re-join the consumer group due > to a new member being added > # As part of re-joining, thread 1 lists non-empty state directories in order > to report the offset's it has in it's state stores as part of it's metadata. > Thread 1 sees that the directory for 0_1 is not empty. > # The cleanup thread on instance B runs. The cleanup thread locks state > store 0_1, sees the directory for 0_1 was last modified more than > `state.cleanup.delay.ms` ago, deletes it, and unlocks it successfully > # Thread 1 takes a lock on directory 0_1 due to it being found not-empty > before, unaware that the cleanup has run between the time of the check and > the lock. It tracks this lock in it's own in-memory store, in addition to > StateDirectory's in-memory lock store > # Thread 1 successfully joins the consumer group > # After every consumer in the group joins the group, assignments are > calculated, and then every consumer calls sync group to receive the new > assignments > # Thread 1 on Instance B calls sync group but gets an error - the group > coordinator has triggered a new rebalance and all members must rejoin the > group > # Thread 1 again lists non-empty state directories in order to report the > offset's it has in it's state stores as part of it's metadata. Prior to doing > so, it clears it's in-memory store tracking the locks it has taken for the > purpose of gathering rebalance metadata > # Thread 1 no longer takes a lock on 0_1 as it is empty > # However, that lock on 0_1 owned by Thread 1 remains in StateDirectory > # All consumers re-join and sync successfully, receiving their new > assignments > # Thread 2 on Instance B is assigned task 0_1 > # Thread 2 cannot take a lock on 0_1 in the StateDirectory because it is > still being held by Thread 1 > # Thread 2 remains in rebalancing state, and cannot make progress on task > 0_1, or any other tasks it has assigned. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16025) Streams StateDirectory has orphaned locks after rebalancing, blocking future rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-16025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman reassigned KAFKA-16025: -- Assignee: Sabit > Streams StateDirectory has orphaned locks after rebalancing, blocking future > rebalancing > > > Key: KAFKA-16025 > URL: https://issues.apache.org/jira/browse/KAFKA-16025 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.4.0 > Environment: Linux >Reporter: Sabit >Assignee: Sabit >Priority: Major > Fix For: 3.8.0 > > > Hello, > > We are encountering an issue where during rebalancing, we see streams threads > on one client get stuck in rebalancing. Upon enabling debug logs, we saw that > some tasks were having issues initializing due to failure to grab a lock in > the StateDirectory: > > {{2023-12-14 22:51:57.352000Z stream-thread > [i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since: > stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51] > Failed to lock the state directory for task 0_51; will retry}} > > We were able to reproduce this behavior reliably on 3.4.0. This is the > sequence that triggers the bug. > Assume in a streams consumer group, there are 5 instances (A, B, C, D, E), > each with 5 threads (1-5), and the consumer is using stateful tasks which > have state stores on disk. There are 10 active tasks and 10 standby tasks. > # Instance A is deactivated > # As an example, lets say task 0_1, previously on instance B, moves to > instance C > # Task 0_1 leaves behind it's state directory on Instance B's disk, > currently unused, and no lock for it exists in Instance B's StateDirectory > in-memory lock tracker > # Instance A is re-activated > # Streams thread 1 on Instance B is asked to re-join the consumer group due > to a new member being added > # As part of re-joining, thread 1 lists non-empty state directories in order > to report the offset's it has in it's state stores as part of it's metadata. > Thread 1 sees that the directory for 0_1 is not empty. > # The cleanup thread on instance B runs. The cleanup thread locks state > store 0_1, sees the directory for 0_1 was last modified more than > `state.cleanup.delay.ms` ago, deletes it, and unlocks it successfully > # Thread 1 takes a lock on directory 0_1 due to it being found not-empty > before, unaware that the cleanup has run between the time of the check and > the lock. It tracks this lock in it's own in-memory store, in addition to > StateDirectory's in-memory lock store > # Thread 1 successfully joins the consumer group > # After every consumer in the group joins the group, assignments are > calculated, and then every consumer calls sync group to receive the new > assignments > # Thread 1 on Instance B calls sync group but gets an error - the group > coordinator has triggered a new rebalance and all members must rejoin the > group > # Thread 1 again lists non-empty state directories in order to report the > offset's it has in it's state stores as part of it's metadata. Prior to doing > so, it clears it's in-memory store tracking the locks it has taken for the > purpose of gathering rebalance metadata > # Thread 1 no longer takes a lock on 0_1 as it is empty > # However, that lock on 0_1 owned by Thread 1 remains in StateDirectory > # All consumers re-join and sync successfully, receiving their new > assignments > # Thread 2 on Instance B is assigned task 0_1 > # Thread 2 cannot take a lock on 0_1 in the StateDirectory because it is > still being held by Thread 1 > # Thread 2 remains in rebalancing state, and cannot make progress on task > 0_1, or any other tasks it has assigned. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16025) Streams StateDirectory has orphaned locks after rebalancing, blocking future rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-16025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-16025: --- Fix Version/s: 3.8.0 > Streams StateDirectory has orphaned locks after rebalancing, blocking future > rebalancing > > > Key: KAFKA-16025 > URL: https://issues.apache.org/jira/browse/KAFKA-16025 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.4.0 > Environment: Linux >Reporter: Sabit >Priority: Major > Fix For: 3.8.0 > > > Hello, > > We are encountering an issue where during rebalancing, we see streams threads > on one client get stuck in rebalancing. Upon enabling debug logs, we saw that > some tasks were having issues initializing due to failure to grab a lock in > the StateDirectory: > > {{2023-12-14 22:51:57.352000Z stream-thread > [i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since: > stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51] > Failed to lock the state directory for task 0_51; will retry}} > > We were able to reproduce this behavior reliably on 3.4.0. This is the > sequence that triggers the bug. > Assume in a streams consumer group, there are 5 instances (A, B, C, D, E), > each with 5 threads (1-5), and the consumer is using stateful tasks which > have state stores on disk. There are 10 active tasks and 10 standby tasks. > # Instance A is deactivated > # As an example, lets say task 0_1, previously on instance B, moves to > instance C > # Task 0_1 leaves behind it's state directory on Instance B's disk, > currently unused, and no lock for it exists in Instance B's StateDirectory > in-memory lock tracker > # Instance A is re-activated > # Streams thread 1 on Instance B is asked to re-join the consumer group due > to a new member being added > # As part of re-joining, thread 1 lists non-empty state directories in order > to report the offset's it has in it's state stores as part of it's metadata. > Thread 1 sees that the directory for 0_1 is not empty. > # The cleanup thread on instance B runs. The cleanup thread locks state > store 0_1, sees the directory for 0_1 was last modified more than > `state.cleanup.delay.ms` ago, deletes it, and unlocks it successfully > # Thread 1 takes a lock on directory 0_1 due to it being found not-empty > before, unaware that the cleanup has run between the time of the check and > the lock. It tracks this lock in it's own in-memory store, in addition to > StateDirectory's in-memory lock store > # Thread 1 successfully joins the consumer group > # After every consumer in the group joins the group, assignments are > calculated, and then every consumer calls sync group to receive the new > assignments > # Thread 1 on Instance B calls sync group but gets an error - the group > coordinator has triggered a new rebalance and all members must rejoin the > group > # Thread 1 again lists non-empty state directories in order to report the > offset's it has in it's state stores as part of it's metadata. Prior to doing > so, it clears it's in-memory store tracking the locks it has taken for the > purpose of gathering rebalance metadata > # Thread 1 no longer takes a lock on 0_1 as it is empty > # However, that lock on 0_1 owned by Thread 1 remains in StateDirectory > # All consumers re-join and sync successfully, receiving their new > assignments > # Thread 2 on Instance B is assigned task 0_1 > # Thread 2 cannot take a lock on 0_1 in the StateDirectory because it is > still being held by Thread 1 > # Thread 2 remains in rebalancing state, and cannot make progress on task > 0_1, or any other tasks it has assigned. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16025) Streams StateDirectory has orphaned locks after rebalancing, blocking future rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-16025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801927#comment-17801927 ] A. Sophie Blee-Goldman commented on KAFKA-16025: Nice find and writeup of the race condition – I'll take a look at the fix > Streams StateDirectory has orphaned locks after rebalancing, blocking future > rebalancing > > > Key: KAFKA-16025 > URL: https://issues.apache.org/jira/browse/KAFKA-16025 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.4.0 > Environment: Linux >Reporter: Sabit >Priority: Major > > Hello, > > We are encountering an issue where during rebalancing, we see streams threads > on one client get stuck in rebalancing. Upon enabling debug logs, we saw that > some tasks were having issues initializing due to failure to grab a lock in > the StateDirectory: > > {{2023-12-14 22:51:57.352000Z stream-thread > [i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since: > stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51] > Failed to lock the state directory for task 0_51; will retry}} > > We were able to reproduce this behavior reliably on 3.4.0. This is the > sequence that triggers the bug. > Assume in a streams consumer group, there are 5 instances (A, B, C, D, E), > each with 5 threads (1-5), and the consumer is using stateful tasks which > have state stores on disk. There are 10 active tasks and 10 standby tasks. > # Instance A is deactivated > # As an example, lets say task 0_1, previously on instance B, moves to > instance C > # Task 0_1 leaves behind it's state directory on Instance B's disk, > currently unused, and no lock for it exists in Instance B's StateDirectory > in-memory lock tracker > # Instance A is re-activated > # Streams thread 1 on Instance B is asked to re-join the consumer group due > to a new member being added > # As part of re-joining, thread 1 lists non-empty state directories in order > to report the offset's it has in it's state stores as part of it's metadata. > Thread 1 sees that the directory for 0_1 is not empty. > # The cleanup thread on instance B runs. The cleanup thread locks state > store 0_1, sees the directory for 0_1 was last modified more than > `state.cleanup.delay.ms` ago, deletes it, and unlocks it successfully > # Thread 1 takes a lock on directory 0_1 due to it being found not-empty > before, unaware that the cleanup has run between the time of the check and > the lock. It tracks this lock in it's own in-memory store, in addition to > StateDirectory's in-memory lock store > # Thread 1 successfully joins the consumer group > # After every consumer in the group joins the group, assignments are > calculated, and then every consumer calls sync group to receive the new > assignments > # Thread 1 on Instance B calls sync group but gets an error - the group > coordinator has triggered a new rebalance and all members must rejoin the > group > # Thread 1 again lists non-empty state directories in order to report the > offset's it has in it's state stores as part of it's metadata. Prior to doing > so, it clears it's in-memory store tracking the locks it has taken for the > purpose of gathering rebalance metadata > # Thread 1 no longer takes a lock on 0_1 as it is empty > # However, that lock on 0_1 owned by Thread 1 remains in StateDirectory > # All consumers re-join and sync successfully, receiving their new > assignments > # Thread 2 on Instance B is assigned task 0_1 > # Thread 2 cannot take a lock on 0_1 in the StateDirectory because it is > still being held by Thread 1 > # Thread 2 remains in rebalancing state, and cannot make progress on task > 0_1, or any other tasks it has assigned. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16055) Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders
[ https://issues.apache.org/jira/browse/KAFKA-16055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801924#comment-17801924 ] A. Sophie Blee-Goldman commented on KAFKA-16055: See this discussion on the user mailing list for additional context: [https://lists.apache.org/thread/gpct1275bfqovlckptn3lvf683qpoxol] > Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders > > > Key: KAFKA-16055 > URL: https://issues.apache.org/jira/browse/KAFKA-16055 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.1 >Reporter: Kohei Nozaki >Priority: Minor > Labels: newbie, newbie++ > > This was originally raised in [a kafka-users > post|https://lists.apache.org/thread/gpct1275bfqovlckptn3lvf683qpoxol]. > There is a HashMap stored in QueryableStoreProvider#storeProviders ([code > link|https://github.com/apache/kafka/blob/3.6.1/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java#L39]) > which can be mutated by a KafkaStreams#removeStreamThread() call. This can > be problematic when KafkaStreams#store is called from a separate thread. > We need to somehow make this part of code thread-safe by replacing it by > ConcurrentHashMap or/and using an existing locking mechanism. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16055) Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders
[ https://issues.apache.org/jira/browse/KAFKA-16055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-16055: --- Labels: newbie newbie++ (was: ) > Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders > > > Key: KAFKA-16055 > URL: https://issues.apache.org/jira/browse/KAFKA-16055 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.1 >Reporter: Kohei Nozaki >Priority: Minor > Labels: newbie, newbie++ > > This was originally raised in [a kafka-users > post|https://lists.apache.org/thread/gpct1275bfqovlckptn3lvf683qpoxol]. > There is a HashMap stored in QueryableStoreProvider#storeProviders ([code > link|https://github.com/apache/kafka/blob/3.6.1/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java#L39]) > which can be mutated by a KafkaStreams#removeStreamThread() call. This can > be problematic when KafkaStreams#store is called from a separate thread. > We need to somehow make this part of code thread-safe by replacing it by > ConcurrentHashMap or/and using an existing locking mechanism. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15798) Flaky Test NamedTopologyIntegrationTest.shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology()
[ https://issues.apache.org/jira/browse/KAFKA-15798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17789901#comment-17789901 ] A. Sophie Blee-Goldman commented on KAFKA-15798: cc [~mjsax] [~wcarlson5] – maybe some motivation to remove, or at least block, the named topologies feature in 3.7 > Flaky Test > NamedTopologyIntegrationTest.shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology() > - > > Key: KAFKA-15798 > URL: https://issues.apache.org/jira/browse/KAFKA-15798 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Justine Olshan >Priority: Major > Labels: flaky-test > > I saw a few examples recently. 2 have the same error, but the third is > different > [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14629/22/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_8_and_Scala_2_12___shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology___2/] > [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2365/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_21_and_Scala_2_13___shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology__/] > > The failure is like > {code:java} > java.lang.AssertionError: Did not receive all 5 records from topic > output-stream-1 within 6 ms, currently accumulated data is [] Expected: > is a value equal to or greater than <5> but: <0> was less than <5>{code} > The other failure was > [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2365/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_8_and_Scala_2_12___shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology__/] > {code:java} > java.lang.AssertionError: Expected: <[0, 1]> but: was <[0]>{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15798) Flaky Test NamedTopologyIntegrationTest.shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology()
[ https://issues.apache.org/jira/browse/KAFKA-15798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17789898#comment-17789898 ] A. Sophie Blee-Goldman commented on KAFKA-15798: Took a quick look at this in the name of running down some of the worst flaky tests in Streams. I think it's pretty clear that this is failing because of the state updater thread (see below), but it's not as clear to me whether this hints at a real bug with the state updater thread or whether it only broke the named topologies feature. If it's the latter, we should probably block people from using named topologies when the state updater thread is enabled in 3.7. Although I'm actually leaning towards going a step further and just taking out the named topologies altogether – we can just remove the "public" API classes for now, as extracting all the internal logic is somewhat of a bigger project that we shouldn't rush. Of course, this is all assuming there is something about the state updater that broke named topologies – someone more familiar with the state updater should definitely verify that this isn't a real bug in normal Streams first! cc [~cadonna] [~lucasb] Oh, and this is how I know the state updater thread is responsible: if you look at [the graph of failure rates for this test|https://ge.apache.org/scans/tests?search.names=Git%20branch&search.relativeStartTime=P90D&search.rootProjectNames=kafka&search.timeZoneId=America%2FLos_Angeles&search.values=trunk&tests.container=org.apache.kafka.streams.integration.NamedTopologyIntegrationTest&tests.sortField=FLAKY&tests.test=shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology()], you'll see it goes from literally zero flakiness to the 2nd most commonly failing test in all of Streams on Oct 4th. This is the day we turned on the state updater thread by default. (It's also a bit concerning that we didn't catch this sooner. The uptick in failure rate of this test is actually quite sudden. Would be great if we could somehow manually alert on this sort of thing) > Flaky Test > NamedTopologyIntegrationTest.shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology() > - > > Key: KAFKA-15798 > URL: https://issues.apache.org/jira/browse/KAFKA-15798 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Justine Olshan >Priority: Major > Labels: flaky-test > > I saw a few examples recently. 2 have the same error, but the third is > different > [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14629/22/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_8_and_Scala_2_12___shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology___2/] > [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2365/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_21_and_Scala_2_13___shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology__/] > > The failure is like > {code:java} > java.lang.AssertionError: Did not receive all 5 records from topic > output-stream-1 within 6 ms, currently accumulated data is [] Expected: > is a value equal to or greater than <5> but: <0> was less than <5>{code} > The other failure was > [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2365/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_8_and_Scala_2_12___shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology__/] > {code:java} > java.lang.AssertionError: Expected: <[0, 1]> but: was <[0]>{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15843) Review consumer onPartitionsAssigned called with empty partitions
[ https://issues.apache.org/jira/browse/KAFKA-15843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17787681#comment-17787681 ] A. Sophie Blee-Goldman commented on KAFKA-15843: Hey [~lianetm] I worked on the old ConsumerRebalanceListener a lot and can provide some context here. The reason #onPartitionsAssigned is still called on an empty set of partitions is largely historical, and the tl;dr is that it's probably ok to change this behavior in the new consumer if it won't impact the older one. For some background, in the old days of eager rebalancing (which is still the default protocol in the original consumer client), we would always invoke both #onPartitionsRevoked and #onPartitionsAssigned at the start and end of a rebalance, respectively. And since all partitions are revoked and re-assigned with eager rebalancing, there (usually) was a non-empty set of partitions passed into each of these. Then came incremental cooperative rebalancing: we no longer revoked & reassigned all the partitions and instead acted only on the incremental change in partition assignment. So #onPartitionsRevoked only gets the subset of partitions that are being migrated to a different consumer, and #onPartitionsAssigned only gets newly-added partitions. Also, with the cooperative protocol, #onPartitionsRevoked would be invoked at the _end_ of a rebalance, rather than at the beginning. However we still had to maintain compatibility across the two protocols for those implementing ConsumerRebalanceListener. And it was common to use the rebalance listener not just to listen in on the partition assignment, but to notify about the start and end of a rebalance. Therefore we decided to guarantee that #onPartitionsAssigned would still be invoked at the end of every rebalance, in case of users relying on this callback to detect the end of a rebalance. However, since #onPartitionsRevoked is no longer even invoked at the start of a cooperative rebalance, it can't be used to detect the start of one anymore and there was no reason to continue invoking it on every rebalance unless there were actually some partitions that were revoked. You'll notice that if the eager protocol is still enabled, the #onPartitionsRevoked callback actually is still invoked regardless of whether there's a non-empty set of partitions passed into it or not. #onPartitionsLost is a bit of a special case, since (a) it was only added around the time cooperative rebalancing was implemented, as there was no old behavior for us to maintain compatibility with, and (b) it doesn't happen during a regular rebalance but instead only to notify the rebalance listener of a special case, ie that it has lost ownership of these partitions (but for that exact reason cannot commit offsets for them, as would normally occur in an #onPartitionsRevoked). If there aren't any lost partitions, there's no reason to invoke this callback (and it would be misleading to do so) My understanding is that there is no "eager" or "cooperative" protocol in the new consumer, it's an entirely new protocol, so I would assume you're not obligated to maintain compatibility for existing ConsumerRebalanceListener implementations. In that case, it probably does not make sense to guarantee that #onPartitionsAssigned is invoked on every rebalance regardless, even if no new partitions are added. I'm not super familiar with the KIP-848 implementation details, but I would assume that users can still use the ConsumerPartitionAssignor callbacks to effectively detect the start and end of a rebalance (via #subscriptionUserdata and #onAssignment) Of course, if you intend to change the behavior in a way that would affect the old consumer as well, then you'll need to give Kafka Streams time to adopt a new approach since we currently still rely on #onPartitionsAssigned to notify us when a rebalance ends. I'm pretty sure we don't plan on using the new consumer right away though, since we'll need to make a number of changes like this one before we can do so. > Review consumer onPartitionsAssigned called with empty partitions > - > > Key: KAFKA-15843 > URL: https://issues.apache.org/jira/browse/KAFKA-15843 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Priority: Major > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > > Legacy coordinator triggers onPartitionsAssigned with empty assignment (which > is not the case when triggering onPartitionsRevoked or Lost). This is the > behaviour of the legacy coordinator, and the new consumer implementation > maintains the same principle. We should review this to fully understand if it > is really need
[jira] [Commented] (KAFKA-15834) Subscribing to non-existent topic blocks StreamThread from stopping
[ https://issues.apache.org/jira/browse/KAFKA-15834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17787679#comment-17787679 ] A. Sophie Blee-Goldman commented on KAFKA-15834: Found the ticket: https://issues.apache.org/jira/browse/KAFKA-9398 And yes, it's still unresolved. Given all the above, I think we can honestly just disable/remove the test, as the named topologies feature was never made into a real public API. I do know of a few people who are using it anyway but they're aware it was only an experimental feature and not fully supported by Streams. So imo we don't need to go out of our way to fix any flaky tests: provided we can demonstrate that the issue is specific to named topologies and not potentially an issue with Streams itself. Of course in this case it's actually the latter, but we've recognized the root cause as a known issue, so I don't think there's anything more this test can do for us besides be flaky and annoy everyone. Thanks for digging into this! > Subscribing to non-existent topic blocks StreamThread from stopping > --- > > Key: KAFKA-15834 > URL: https://issues.apache.org/jira/browse/KAFKA-15834 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.0 >Reporter: Greg Harris >Priority: Major > > In > NamedTopologyIntegrationTest#shouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics > a topology is created which references an input topic which does not exist. > The test as-written passes, but the KafkaStreams#close(Duration) at the end > times out, and leaves StreamsThreads running. > From some cursory investigation it appears that this is happening: > 1. The consumer calls the StreamsPartitionAssignor, which calls > TaskManager#handleRebalanceStart as a side-effect > 2. handleRebalanceStart sets the rebalanceInProgress flag > 3. This flag is checked by StreamThread.runLoop, and causes the loop to > remain running. > 4. The consumer never calls StreamsRebalanceListener#onPartitionsAssigned, > because the topic does not exist > 5. Because no partitions are ever assigned, the > TaskManager#handleRebalanceComplete never clears the rebalanceInProgress flag > > This log message is printed in a tight loop while the close is ongoing and > the consumer is being polled with zero duration: > {noformat} > [2023-11-15 11:42:43,661] WARN [Consumer > clientId=NamedTopologyIntegrationTestshouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics-942756f8-5213-4c44-bb6b-5f805884e026-StreamThread-1-consumer, > > groupId=NamedTopologyIntegrationTestshouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics] > Received unknown topic or partition error in fetch for partition > unique_topic_prefix-topology-1-store-repartition-0 > (org.apache.kafka.clients.consumer.internals.FetchCollector:321) > {noformat} > Practically, this means that this test leaks two StreamsThreads and the > associated clients and sockets, and delays the completion of the test until > the KafkaStreams#close(Duration) call times out. > Either we should change the rebalanceInProgress flag to avoid getting stuck > in this rebalance state, or figure out a way to shut down a StreamsThread > that is in an extended rebalance state during shutdown. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15834) Subscribing to non-existent topic blocks StreamThread from stopping
[ https://issues.apache.org/jira/browse/KAFKA-15834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17787678#comment-17787678 ] A. Sophie Blee-Goldman commented on KAFKA-15834: I just checked the current code and it looks like we do still respect the guarantee of invoking #onPartitionsAssigned in all cases. So I don't think step #4 is correct. Did you happen to see anything in the logs that would suggest the StreamThread was continuing in its regular loop and never stopping due to the rebalanceInProgress flag? Or is it possible that it's hanging somewhere in the shutdown process (or even in the rebalance itself)? I'm just wondering if it might be related to the Producer, not the Consumer. I know we had some issues with the Producer#close hanging in the past, and that it was related to users deleting topics from under the app, which would be a similar situation to what you found here. I'm not sure if we ever fixed that, maybe [~mjsax] will remember the ticket for the Producer issue? > Subscribing to non-existent topic blocks StreamThread from stopping > --- > > Key: KAFKA-15834 > URL: https://issues.apache.org/jira/browse/KAFKA-15834 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.0 >Reporter: Greg Harris >Priority: Major > > In > NamedTopologyIntegrationTest#shouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics > a topology is created which references an input topic which does not exist. > The test as-written passes, but the KafkaStreams#close(Duration) at the end > times out, and leaves StreamsThreads running. > From some cursory investigation it appears that this is happening: > 1. The consumer calls the StreamsPartitionAssignor, which calls > TaskManager#handleRebalanceStart as a side-effect > 2. handleRebalanceStart sets the rebalanceInProgress flag > 3. This flag is checked by StreamThread.runLoop, and causes the loop to > remain running. > 4. The consumer never calls StreamsRebalanceListener#onPartitionsAssigned, > because the topic does not exist > 5. Because no partitions are ever assigned, the > TaskManager#handleRebalanceComplete never clears the rebalanceInProgress flag > > This log message is printed in a tight loop while the close is ongoing and > the consumer is being polled with zero duration: > {noformat} > [2023-11-15 11:42:43,661] WARN [Consumer > clientId=NamedTopologyIntegrationTestshouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics-942756f8-5213-4c44-bb6b-5f805884e026-StreamThread-1-consumer, > > groupId=NamedTopologyIntegrationTestshouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics] > Received unknown topic or partition error in fetch for partition > unique_topic_prefix-topology-1-store-repartition-0 > (org.apache.kafka.clients.consumer.internals.FetchCollector:321) > {noformat} > Practically, this means that this test leaks two StreamsThreads and the > associated clients and sockets, and delays the completion of the test until > the KafkaStreams#close(Duration) call times out. > Either we should change the rebalanceInProgress flag to avoid getting stuck > in this rebalance state, or figure out a way to shut down a StreamsThread > that is in an extended rebalance state during shutdown. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15834) Subscribing to non-existent topic blocks StreamThread from stopping
[ https://issues.apache.org/jira/browse/KAFKA-15834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17787677#comment-17787677 ] A. Sophie Blee-Goldman commented on KAFKA-15834: Yeah great analysis, thanks [~gharris1727] I'm a bit confused by point #4, however – is this a change in behavior (possibly related to KIP-848)? It's my understanding that the #onPartitionsAssigned callback is guaranteed to always be invoked regardless of whether the set of partitions being newly assigned is non-empty or not. This is in contrast with the #onPartitionsRevoked and #onPartitionsLost callbacks, which are only invoked when the set of partitions to act upon is non-empty. I think one could argue that this inconsistency is not ideal, but the behavior of always invoking #onPartitionsAssigned is a stated guarantee in the public contract of ConsumerRebalanceListener. See [this paragraph|https://github.com/apache/kafka/blob/254335d24ab6b6d13142dcdb53fec3856c16de9e/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java#L67] of the javadocs. In other words, I don't think we can change this without a KIP, and if this behavior was modified recently then we need to revert that change until a KIP is accepted. > Subscribing to non-existent topic blocks StreamThread from stopping > --- > > Key: KAFKA-15834 > URL: https://issues.apache.org/jira/browse/KAFKA-15834 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.0 >Reporter: Greg Harris >Priority: Major > > In > NamedTopologyIntegrationTest#shouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics > a topology is created which references an input topic which does not exist. > The test as-written passes, but the KafkaStreams#close(Duration) at the end > times out, and leaves StreamsThreads running. > From some cursory investigation it appears that this is happening: > 1. The consumer calls the StreamsPartitionAssignor, which calls > TaskManager#handleRebalanceStart as a side-effect > 2. handleRebalanceStart sets the rebalanceInProgress flag > 3. This flag is checked by StreamThread.runLoop, and causes the loop to > remain running. > 4. The consumer never calls StreamsRebalanceListener#onPartitionsAssigned, > because the topic does not exist > 5. Because no partitions are ever assigned, the > TaskManager#handleRebalanceComplete never clears the rebalanceInProgress flag > > This log message is printed in a tight loop while the close is ongoing and > the consumer is being polled with zero duration: > {noformat} > [2023-11-15 11:42:43,661] WARN [Consumer > clientId=NamedTopologyIntegrationTestshouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics-942756f8-5213-4c44-bb6b-5f805884e026-StreamThread-1-consumer, > > groupId=NamedTopologyIntegrationTestshouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics] > Received unknown topic or partition error in fetch for partition > unique_topic_prefix-topology-1-store-repartition-0 > (org.apache.kafka.clients.consumer.internals.FetchCollector:321) > {noformat} > Practically, this means that this test leaks two StreamsThreads and the > associated clients and sockets, and delays the completion of the test until > the KafkaStreams#close(Duration) call times out. > Either we should change the rebalanceInProgress flag to avoid getting stuck > in this rebalance state, or figure out a way to shut down a StreamsThread > that is in an extended rebalance state during shutdown. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15326) Decouple Processing Thread from Polling Thread
[ https://issues.apache.org/jira/browse/KAFKA-15326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17783436#comment-17783436 ] A. Sophie Blee-Goldman commented on KAFKA-15326: [~lbrutschy] it looks like there are (at least) 10 PRs associated with this ticket/change, according to the naming sequence at least. But only half of them are linked in this ticket. Can you locate the remaining 5 and attach them to this ticket so the entire implementation is contained here? > Decouple Processing Thread from Polling Thread > -- > > Key: KAFKA-15326 > URL: https://issues.apache.org/jira/browse/KAFKA-15326 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Critical > > As part of an ongoing effort to implement a better threading architecture in > Kafka streams, we decouple N stream threads into N polling threads and N > processing threads. The effort to consolidate N polling thread into a single > thread is follow-up after this ticket. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15782) Establish concrete project conventions to define public APIs that require a KIP
A. Sophie Blee-Goldman created KAFKA-15782: -- Summary: Establish concrete project conventions to define public APIs that require a KIP Key: KAFKA-15782 URL: https://issues.apache.org/jira/browse/KAFKA-15782 Project: Kafka Issue Type: Improvement Reporter: A. Sophie Blee-Goldman There seems to be no concrete definition that establishes project-specific conventions for what is and is not considered a public API change that requires a KIP. This results in frequent drawn-out debates that revisit the same topic and slow things down, and often ends up forcing trivial changes through the KIP process. For a recent example, KIP-998 was required for a one-line change just to add the "protected" access modifier to an otherwise package-private class. See [this comment thread|https://github.com/apache/kafka/pull/14681#discussion_r1378591228] for the full debate on this subject. It would be beneficial and in the long run save us all time to just sit down and hash out the project conventions, such as whether a package-private/protected method on a non-final java class is to be considered a public API, even if the method itself is/was never a public method. This will of course require a KIP, but should help to establish some ground rules to avoid any more superfluous KIPs in the future -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15781) Change ProducerConfig(props, doLog) constructor to protected
[ https://issues.apache.org/jira/browse/KAFKA-15781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-15781: --- Description: See [https://github.com/apache/kafka/pull/14681] KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-998%3A+Give+ProducerConfig%28props%2C+doLog%29+constructor+protected+access was:See https://github.com/apache/kafka/pull/14681 > Change ProducerConfig(props, doLog) constructor to protected > > > Key: KAFKA-15781 > URL: https://issues.apache.org/jira/browse/KAFKA-15781 > Project: Kafka > Issue Type: Improvement > Components: producer >Reporter: A. Sophie Blee-Goldman >Assignee: A. Sophie Blee-Goldman >Priority: Major > Labels: kip > > See [https://github.com/apache/kafka/pull/14681] > > KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-998%3A+Give+ProducerConfig%28props%2C+doLog%29+constructor+protected+access -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15781) Change ProducerConfig(props, doLog) constructor to protected
A. Sophie Blee-Goldman created KAFKA-15781: -- Summary: Change ProducerConfig(props, doLog) constructor to protected Key: KAFKA-15781 URL: https://issues.apache.org/jira/browse/KAFKA-15781 Project: Kafka Issue Type: Improvement Components: producer Reporter: A. Sophie Blee-Goldman Assignee: A. Sophie Blee-Goldman See https://github.com/apache/kafka/pull/14681 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14419) Failed SyncGroup leading to partitions lost due to processing during rebalances
[ https://issues.apache.org/jira/browse/KAFKA-14419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-14419: --- Summary: Failed SyncGroup leading to partitions lost due to processing during rebalances (was: Same message consumed again by the same stream task after partition is lost and reassigned) > Failed SyncGroup leading to partitions lost due to processing during > rebalances > --- > > Key: KAFKA-14419 > URL: https://issues.apache.org/jira/browse/KAFKA-14419 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.1 > Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64 >Reporter: Mikael >Priority: Major > > Trigger scenario: > Four Kafka client application instances on separate EC2 instances with a > total of 8 active and 8 standby stream tasks for the same stream topology, > consuming from an input topic with 8 partitions. Sometimes a handful of > messages are consumed twice by one of the stream tasks when stream tasks on > another application instance join the consumer group after an application > instance restart. > Additional information: > Messages are produced to the topic by another Kafka streams topology deployed > on the same four application instances. I have verified that each message is > only produced once by enabling debug logging in the topology flow right > before producing each message to the topic. > Logs from stream thread with duplicate consumption: > > {code:java} > 2022-11-21 15:09:33,677 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Request joining group due to: group is > already rebalancing > 2022-11-21 15:09:33,677 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:566] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] (Re-)joining group > Input records consumed for the first time > 2022-11-21 15:09:33,919 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:627] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Successfully joined group with > generation Generation{generationId=8017, > memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74', > protocol='stream'} > 2022-11-21 15:09:33,920 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:826] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] SyncGroup failed: The group began > another rebalance. Need to re-join the group. Sent generation was > Generation{generationId=8017, > memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74', > protocol='stream'} > 2022-11-21 15:09:33,922 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1019] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Resetting generation due to: > encountered REBALANCE_IN_PROGRESS from SYNC_GROUP response > 2022-11-21 15:09:33,922 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Request joining group due to: > encountered REBALANCE_IN_PROGRESS from SYNC_GROUP response > 2022-11-21 15:09:33,923 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:819] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Giving away all assigned partitions as > lost since generation/membe
[jira] [Commented] (KAFKA-14419) Same message consumed again by the same stream task after partition is lost and reassigned
[ https://issues.apache.org/jira/browse/KAFKA-14419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17780119#comment-17780119 ] A. Sophie Blee-Goldman commented on KAFKA-14419: [~Carlstedt] first off, do you by any chance have especially long processing latencies? For example iterating over a large range of a state store, making a blocking remote call, etc Since you're using the default max poll interval, I might recommend starting out by lowering the max.poll.records first, as you generally don't want to make the max.poll.interval too small – ideally that would be the last resort. IIRC the default max.poll.records in Streams is set to 1,000 – so maybe try cutting that down to just 100? You also can (should) experiment a bit to find a good balance for your app in the steady state. Though I suppose if you're seeing this frequently enough/able to reproduce it reliably, you could set it to something extremely low as a test, like 10 let's say, just to see if that solves the issue. I'll try to put together a PR for this sometime soon, maybe early next week, so you can also wait for that and trying running with a patched version of Streams. (Would a trunk/3.7 patched version be alright? I'm happy to create a branch with the fix ported to an earlier version of Kafka Streams if you'd prefer, just let me know which version you need) Btw: I'm going to update the ticket title if you don't mind, so that it reflects the bug described in my last response. The current title is just describing the correct behavior of Kafka Streams working as intended > Same message consumed again by the same stream task after partition is lost > and reassigned > -- > > Key: KAFKA-14419 > URL: https://issues.apache.org/jira/browse/KAFKA-14419 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.1 > Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64 >Reporter: Mikael >Priority: Major > > Trigger scenario: > Four Kafka client application instances on separate EC2 instances with a > total of 8 active and 8 standby stream tasks for the same stream topology, > consuming from an input topic with 8 partitions. Sometimes a handful of > messages are consumed twice by one of the stream tasks when stream tasks on > another application instance join the consumer group after an application > instance restart. > Additional information: > Messages are produced to the topic by another Kafka streams topology deployed > on the same four application instances. I have verified that each message is > only produced once by enabling debug logging in the topology flow right > before producing each message to the topic. > Logs from stream thread with duplicate consumption: > > {code:java} > 2022-11-21 15:09:33,677 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Request joining group due to: group is > already rebalancing > 2022-11-21 15:09:33,677 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:566] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] (Re-)joining group > Input records consumed for the first time > 2022-11-21 15:09:33,919 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:627] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Successfully joined group with > generation Generation{generationId=8017, > memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74', > protocol='stream'} > 2022-11-21 15:09:33,920 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:826] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] SyncGroup failed: The group began > another rebalance. Need to re-join the group. Sent generation was > Generation{generationId=8017, > memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f2
[jira] [Comment Edited] (KAFKA-14419) Same message consumed again by the same stream task after partition is lost and reassigned
[ https://issues.apache.org/jira/browse/KAFKA-14419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779731#comment-17779731 ] A. Sophie Blee-Goldman edited comment on KAFKA-14419 at 10/26/23 10:49 PM: --- Hey, sorry for the long delay – I'm still trying to catch up my memory of this ticket and the related one, but after looking at it again with fresh eyes I think I figured out what's going on here. If I'm reading this situation correctly, it does seem like there is some less-than-ideal behavior that we might be able to improve. Based on your recent logs, I think the root cause here is basically the same as what I fixed in [https://github.com/apache/kafka/pull/12869|https://github.com/apache/kafka/pull/12869,], just to a lesser degree. The issue in that patch was that Streams would sometimes trigger a followup rebalance even while the current rebalance was still going on, which lead some members to drop out of the group upon hitting a REBALANCE_IN_PROGRESS error during the SyncGroup phase. The fix basically just made the StreamThread wait until the rebalance was over before triggering a followup. This should have been sufficient, but I suppose it is still theoretically possible to run into the same issue. Taking a deeper look at the original issue, it would only arise because of how Streams uses a non-blocking poll which allows it to return to its main loop and continue processing in the background during a rebalance. A lot of things happen throughout the loop, but the relevant operations here are as such: # Check the rebalance "schedule" and trigger one if: ## it has been requested for a time equal to or less than the current time ## the consumer is not actively participating in a rebalance (ie sometime after a SyncGroup response is received but before sending a new JoinGroup request) # Poll for more records, during which time either or both of the following may occur: ## consumer enters a new rebalance by sending a JoinGroup request ## consumer participates in a rebalance by receiving the JoinGroup response and sending a SyncGroup request ## consumer completes an ongoing rebalance by receiving a SyncGroup response, after which it can commit offsets for revoked tasks and initialize new ones # Process more records, which might have been either: ## Newly-consumed during the last poll call, or ## Left over from a previous batch that could not be fully processed before needing to return to poll due to running out of time in the max.poll.interval So here's what I'm imagining: let's say we have two consumer, A and B, with A being the group leader/assignor. # A new rebalance begins, and both threads send their JoinGroup requests before returning to process some records # A doesn't have many records left to process, so it quickly returns to the poll call in step 2 of the loop. However B is still processing a large backlog # A performs the assignment and determines that a followup rebalance is needed, so it sets the rebalance schedule to # After the assignment, A sends it out in the SyncGroup request and exits the poll call # A does some processing (or not) before returning to the poll and receiving the SyncGroup response # A exits the poll again, and this time when it reaches step 1 of the loop, it is now able to trigger the new rebalance # After A has requested a new rebalance, it finally returns to the poll call one more time, and rejoins the group/sends a JoinGroup request to kick it off # This whole time, B has had a large backlog of records, or a very high max.poll.interval, or a long GC pause – you get the idea. It's stuck in step 3 # B finally finishes processing and leaves step 3, returning to the poll call during which it sends a very late SyncGroup request. # When the SyncGroup response is eventually received, B gets the REBALANCE_IN_PROGRESS error and fails its rebalance since the generation is stale The fundamental issue here is that B is theoretically able to spend up to the max.poll.interval between sending its SyncGroup request and returning to poll to process the SyncGroup response, but A might be able to process its SyncGroup response, process its records, and then trigger a new rebalance all in that timeframe. This could happen when the task assignment is heavily imbalanced, for example. I can see a few potential paths forward here, and a fourth option that is more of a temporary workaround for [~Carlstedt] if you're still encountering this. None of them are really a guarantee, but they would help. For the most comprehensive fix we might want to consider doing two or even all three of these: Option 1: add a small delay to the Streams followup rebalance trigger to help the entire group finish the SyncGroup phase before beginning the next rebalance. Option 2: set a shorter upper bound on the maximum tim
[jira] [Commented] (KAFKA-14419) Same message consumed again by the same stream task after partition is lost and reassigned
[ https://issues.apache.org/jira/browse/KAFKA-14419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779731#comment-17779731 ] A. Sophie Blee-Goldman commented on KAFKA-14419: Hey, sorry for the long delay – I'm still trying to catch up my memory of this ticket and the related one, but after looking at it again with fresh eyes I think I figured out what's going on here. If I'm reading this situation correctly, it does seem like there is some less-than-ideal behavior that we might be able to improve. Based on your recent logs, I think the root cause here is basically the same as what I fixed in [https://github.com/apache/kafka/pull/12869|https://github.com/apache/kafka/pull/12869,], just to a lesser degree. The issue in that patch was that Streams would sometimes trigger a followup rebalance even while the current rebalance was still going on, which lead some members to drop out of the group upon hitting a REBALANCE_IN_PROGRESS error during the SyncGroup phase. The fix basically just made the StreamThread wait until the rebalance was over before triggering a followup. This should have been sufficient, but I suppose it is still theoretically possible to run into the same issue. Taking a deeper look at the original issue, it would only arise because of how Streams uses a non-blocking poll which allows it to return to its main loop and continue processing in the background during a rebalance. A lot of things happen throughout the loop, but the relevant operations here are as such: # Check the rebalance "schedule" and trigger one if: ## it has been requested for a time equal to or less than the current time ## the consumer is not actively participating in a rebalance (ie sometime after a SyncGroup response is received but before sending a new JoinGroup request) # Poll for more records, during which time either or both of the following may occur: ## consumer enters a new rebalance by sending a JoinGroup request ## consumer participates in a rebalance by receiving the JoinGroup response and sending a SyncGroup request ## consumer completes an ongoing rebalance by receiving a SyncGroup response, after which it can commit offsets for revoked tasks and initialize new ones # Process more records, which might have been either: ## Newly-consumed during the last poll call, or ## Left over from a previous batch that could not be fully processed before needing to return to poll due to running out of time in the max.poll.interval So here's what I'm imagining: let's say we have two consumer, A and B, with A being the group leader/assignor. # A new rebalance begins, and both threads send their JoinGroup requests before returning to process some records # A doesn't have many records left to process, so it quickly returns to the poll call in step 2 of the loop. However B is still processing a large backlog # A performs the assignment and determines that a followup rebalance is needed, so it sets the rebalance schedule to # After the assignment, A sends it out in the SyncGroup request and exits the poll call # A does some processing (or not) before returning to the poll and receiving the SyncGroup response # A exits the poll again, and this time when it reaches step 1 of the loop, it is now able to trigger the new rebalance # After A has requested a new rebalance, it finally returns to the poll call one more time, and rejoins the group/sends a JoinGroup request to kick it off # This whole time, B has had a large backlog of records, or a very high max.poll.interval, or a long GC pause – you get the idea. It's stuck in step 3 # B finally finishes processing and leaves step 3, returning to the poll call during which it sends a very late SyncGroup request. # When the SyncGroup response is eventually received, B gets the REBALANCE_IN_PROGRESS error and fails its rebalance since the generation is stale The fundamental issue here is that B is theoretically able to spend up to the max.poll.interval between sending its SyncGroup request and returning to poll to process the SyncGroup response, but A might be able to process its SyncGroup response, process its records, and then trigger a new rebalance all in that timeframe. This could happen when the task assignment is heavily imbalanced, for example. I can see a few potential paths forward here, and a fourth option that is more of a temporary workaround for [~Carlstedt] if you're still encountering this. None of them are really a guarantee, but they would help. For the most comprehensive fix we might want to consider doing two or even all three of these: Option 1: add a small delay to the Streams followup rebalance trigger to help the entire group finish the SyncGroup phase before beginning the next rebalance. Option 2: set a shorter upper bound on the maximum time a StreamThread can spend processing records while
[jira] [Resolved] (KAFKA-15116) Kafka Streams processing blocked during rebalance
[ https://issues.apache.org/jira/browse/KAFKA-15116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-15116. Resolution: Not A Problem > Kafka Streams processing blocked during rebalance > - > > Key: KAFKA-15116 > URL: https://issues.apache.org/jira/browse/KAFKA-15116 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.0 >Reporter: David Gammon >Priority: Major > > We have a Kafka Streams application that simply takes a messages, processes > it and then produces an event out the other side. The complexity is that > there is a requirement that all events with the same partition key must be > committed before the next message is processed. > This works most of the time flawlessly but we have started to see problems > during deployments where the first message blocks the second message during a > rebalance because the first message isn’t committed before the second message > is processed. This ultimately results in transactions timing out and more > rebalancing. > We’ve tried lots of configuration to get the behaviour we require with no > luck. We’ve now put in a temporary fix so that Kafka Streams works with our > framework but it feels like this might be a missing feature or potentially a > bug. > +Example+ > Given: > * We have two messages (InA and InB). > * Both messages have the same partition key. > * A rebalance is in progress so streams is no longer able to commit. > When: > # Message InA -> processor -> OutA (not committed) > # Message InB -> processor -> blocked because #1 has not been committed -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15116) Kafka Streams processing blocked during rebalance
[ https://issues.apache.org/jira/browse/KAFKA-15116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779725#comment-17779725 ] A. Sophie Blee-Goldman commented on KAFKA-15116: I tend to agree with Matthias here – this is definitely not a bug in Kafka Streams, and the behavior you describe in the most recent message sounds like it is working as intended. You can certainly disable cooperative rebalancing by setting the UPDATE_FROM config to something 2.3 or below, but as [~mjsax] mentioned, we plan to remove support for eager rebalancing entirely in an upcoming release so this would only be a temporary fix at best. I say "at best" because it sounds to me like even without cooperative rebalancing, you're relying on implementation details that are not part of the public contract. That's not an accusation, it just means you're putting yourself at risk of breaking changes to your application, like cooperative rebalancing, because the semantics around when to commit are considered internal to Kafka Streams and can change at any time. That said, I do agree that it would be nice if Streams had better support for blocking operations or long-running RPCs. Unfortunately that's just not the case at the moment, so you kind of have to take what you can get, or else rewrite as a plain producer/consumer app. Honestly, while I'm a big believer that almost any kind of application that follows the consume-process-produce pattern can and should be represented as a Streams app, I think this might be one of the few exceptions where it really does make more sense to implement with the plain clients. It sounds like part of the application logic is already driven by an external consumer, which together with the fact that you have strict requirements around committing as well as a shared state with external calls, indicates to me that Streams may not be the right tool for the job. Sort of by definition Streams is supposed to abstract away all the client interactions and all the offset commit logic, in addition to being shardable such that everything you need to process a stream of records for a given partition is local to that shard. So it's hard to imagine how to fit your application logic into a Streams app in a completely "safe" way. Again, this doesn't mean you can't/shouldn't try to stretch the boundaries of Kafka Streams, but I think it makes sense to close this ticket as a bug given that everything is working as intended. However I'd encourage you to consider filing a "New Feature" ticket for some specific functionality that would help with your use case. I believe there is already one for better blocking API/RPC support, but I think the ability to pause processing in Kafka Streams would be nice to have in general, and could be used to pause Streams during a rebalance to solve the issue you're facing. Hope that all makes sense! If you do want to consider rewriting this with the plain clients, I'm happy to give some pointers. It sounds like your Streams application is very complex already so I have to wonder if it might be more simple to write up outside the framework of Kafka Streams > Kafka Streams processing blocked during rebalance > - > > Key: KAFKA-15116 > URL: https://issues.apache.org/jira/browse/KAFKA-15116 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.0 >Reporter: David Gammon >Priority: Major > > We have a Kafka Streams application that simply takes a messages, processes > it and then produces an event out the other side. The complexity is that > there is a requirement that all events with the same partition key must be > committed before the next message is processed. > This works most of the time flawlessly but we have started to see problems > during deployments where the first message blocks the second message during a > rebalance because the first message isn’t committed before the second message > is processed. This ultimately results in transactions timing out and more > rebalancing. > We’ve tried lots of configuration to get the behaviour we require with no > luck. We’ve now put in a temporary fix so that Kafka Streams works with our > framework but it feels like this might be a missing feature or potentially a > bug. > +Example+ > Given: > * We have two messages (InA and InB). > * Both messages have the same partition key. > * A rebalance is in progress so streams is no longer able to commit. > When: > # Message InA -> processor -> OutA (not committed) > # Message InB -> processor -> blocked because #1 has not been committed -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-12550) Introduce RESTORING state to the KafkaStreams FSM
[ https://issues.apache.org/jira/browse/KAFKA-12550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-12550. Resolution: Won't Fix Closing this out since it's usefulness is preempted by the StateUpdaterThread and having moved restoration out of the main StreamThread > Introduce RESTORING state to the KafkaStreams FSM > - > > Key: KAFKA-12550 > URL: https://issues.apache.org/jira/browse/KAFKA-12550 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Major > Labels: needs-kip > > We should consider adding a new state to the KafkaStreams FSM: RESTORING > This would cover the time between the completion of a stable rebalance and > the completion of restoration across the client. Currently, Streams will > report the state during this time as REBALANCING even though it is generally > spending much more time restoring than rebalancing in most cases. > There are a few motivations/benefits behind this idea: > # Observability is a big one: using the umbrella REBALANCING state to cover > all aspects of rebalancing -> task initialization -> restoring has been a > common source of confusion in the past. It’s also proved to be a time sink > for us, during escalations, incidents, mailing list questions, and bug > reports. It often adds latency to escalations in particular as we have to go > through GTS and wait for the customer to clarify whether their “Kafka Streams > is stuck rebalancing” ticket means that it’s literally rebalancing, or just > in the REBALANCING state and actually stuck elsewhere in Streams > # Prereq for global thread improvements: for example [KIP-406: > GlobalStreamThread should honor custom reset policy > |https://cwiki.apache.org/confluence/display/KAFKA/KIP-406%3A+GlobalStreamThread+should+honor+custom+reset+policy] > was ultimately blocked on this as we needed to pause the Streams app while > the global thread restored from the appropriate offset. Since there’s > absolutely no rebalancing involved in this case, piggybacking on the > REBALANCING state would just be shooting ourselves in the foot. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-12550) Introduce RESTORING state to the KafkaStreams FSM
[ https://issues.apache.org/jira/browse/KAFKA-12550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-12550: --- Fix Version/s: (was: 4.0.0) > Introduce RESTORING state to the KafkaStreams FSM > - > > Key: KAFKA-12550 > URL: https://issues.apache.org/jira/browse/KAFKA-12550 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Major > Labels: needs-kip > > We should consider adding a new state to the KafkaStreams FSM: RESTORING > This would cover the time between the completion of a stable rebalance and > the completion of restoration across the client. Currently, Streams will > report the state during this time as REBALANCING even though it is generally > spending much more time restoring than rebalancing in most cases. > There are a few motivations/benefits behind this idea: > # Observability is a big one: using the umbrella REBALANCING state to cover > all aspects of rebalancing -> task initialization -> restoring has been a > common source of confusion in the past. It’s also proved to be a time sink > for us, during escalations, incidents, mailing list questions, and bug > reports. It often adds latency to escalations in particular as we have to go > through GTS and wait for the customer to clarify whether their “Kafka Streams > is stuck rebalancing” ticket means that it’s literally rebalancing, or just > in the REBALANCING state and actually stuck elsewhere in Streams > # Prereq for global thread improvements: for example [KIP-406: > GlobalStreamThread should honor custom reset policy > |https://cwiki.apache.org/confluence/display/KAFKA/KIP-406%3A+GlobalStreamThread+should+honor+custom+reset+policy] > was ultimately blocked on this as we needed to pause the Streams app while > the global thread restored from the appropriate offset. Since there’s > absolutely no rebalancing involved in this case, piggybacking on the > REBALANCING state would just be shooting ourselves in the foot. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15463) StreamsException: Accessing from an unknown node
[ https://issues.apache.org/jira/browse/KAFKA-15463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-15463. Resolution: Not A Problem > StreamsException: Accessing from an unknown node > - > > Key: KAFKA-15463 > URL: https://issues.apache.org/jira/browse/KAFKA-15463 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.2.1 >Reporter: Yevgeny >Priority: Major > > After some time application was working fine, starting to get: > > This is springboot application runs in kubernetes as stateful pod. > > > > {code:java} > Exception in thread > "-ddf9819f-d6c7-46ce-930e-cd923e1b3c2c-StreamThread-1" > org.apache.kafka.streams.errors.StreamsException: Accessing from an unknown > node at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:162) > at myclass1.java:28) at myclass2.java:48) at > java.base/java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90) at > java.base/java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1602) > at > java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:129) > at > java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:527) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:513) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) > at > java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230) > at > java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReferencePipeline.allMatch(ReferencePipeline.java:637) > at myclass3.java:48) at > org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:49) > at > org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:38) > at > org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:66) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:275) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:254) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:213) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) > at > org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:780) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:809) > at > org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:780) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:711) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:589) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:551) > {code} > > stream-thread > [-ddf9819f-d6c7-46ce-930e-cd923e1b3c2c-StreamThread-1] State > transition from PENDING_SHUTDOWN to DEAD > > > Transformer is Prototype bean, the supplier supplys new instance of the > Transformer: > > > {code:java} > @Override public Transformer> get() > { return ctx.getBean(MyTransformer.class); }{code} > > > The only way to recover is to delete all topics used by kafkastreams, even if > application restarted same exception is thrown. > *If messages in internal topics of 'store-changelog' are deleted/offset > manipulated, can it cause the issue? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15463) StreamsException: Accessing from an unknown node
[ https://issues.apache.org/jira/browse/KAFKA-15463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779715#comment-17779715 ] A. Sophie Blee-Goldman commented on KAFKA-15463: [~yevsh] Well the intended pattern is to get the StateStore from the context during init and then save a reference to the store for your processor/transformer. That's why we pass the context in to #init but not to #process (also, while it's not a significant overhead, it's definitely more efficient to do the state store lookup only once, during init, and not on every single record that's processed) That said, I think it's a fair question as to why this is causing an error, and why the error doesn't happen every time. The fact that it generally only appears when you have more than one partition/task per application instance makes me think there is some state that is somehow being shared between the different tasks. This would definitely be explained by returning the same transformer instance each time, but based on your latest update, you are definitely returning a new transformer each time and still seeing the issue, right? Assuming so, I'm inclined to believe the issue is with Spring. I vaguely recall a similar problem being reported in the past, which was ultimately because a Spring object was unexpectedly/unknowingly acting as a static/singleton class. This was resulting in the context – of which there is supposed to be exactly one per task – being shared between different instances of the task/processor nodes. I'm pretty sure that's what is going on here as well. I'm guessing that the MyService class is a Spring bean? If so, then it's effectively a singleton and will be shared by each of the individual Transformer instances in your application, meaning the different tasks will be overwriting each others context when invoking #init on this transformer. So the context you retrieve from the myServiceBean during #process may not be the same as the context you saved to it in #init, causing it to throw this error since only the context corresponding to the task that is currently being processed will have the currentNode set to a non-null value. Even if you made the change I originally suggested but saved the StateStore reference by passing it to the MyService bean, it wouldn't work – it might not throw this error but you would potentially be reading and writing to the wrong copy of a state store for a given task, which is even worse. The only way to solve this is by removing the Spring bean entirely or at least refactoring it so that it doesn't hold any internal state and has to have the full application state for that task passed in to it every time – in other words you just need to make sure to keep all the objects used by a given transformer completely local to that instance. Here is my recommendation for how to implement your transformer class – hope this helps! private final MYService myServiceBean; private StateStore myStore; @Overridepublic void init(ProcessorContext context) \{ myStore = context.getStateStore(STORE_NAME); } @Overridepublic KeyValue transform(String key, MyItem myItem) \{ myServiceBean.process(myItem, myStore); } Basically modify the MyService bean to accept the StateStore to operate on as a parameter to its #process method. And definitely keep the fix in which you return a new Transformer instance each time instead of reusing the same one. Let me know if you have any questions! I'm going to close the ticket since I'm fairly confident in this solution having seen the same problem before, but definitely please do reopen it if you implement the suggested fix and still encounter an error. Good luck! > StreamsException: Accessing from an unknown node > - > > Key: KAFKA-15463 > URL: https://issues.apache.org/jira/browse/KAFKA-15463 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.2.1 >Reporter: Yevgeny >Priority: Major > > After some time application was working fine, starting to get: > > This is springboot application runs in kubernetes as stateful pod. > > > > {code:java} > Exception in thread > "-ddf9819f-d6c7-46ce-930e-cd923e1b3c2c-StreamThread-1" > org.apache.kafka.streams.errors.StreamsException: Accessing from an unknown > node at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:162) > at myclass1.java:28) at myclass2.java:48) at > java.base/java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90) at > java.base/java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1602) > at > java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:129) > at > java.bas
[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" & "cache.max.bytes.buffering" with "{statestore.cache}/{input.buffer}.max.bytes"
[ https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779695#comment-17779695 ] A. Sophie Blee-Goldman commented on KAFKA-13152: Hey [~sagarrao] – I'm happy to try and help you get this into 3.7. As a disclaimer I do have a few other KIPs I've already promised to help land in 3.7 so this would be 3rd on my list, but we still have a good amount of time and given the KIP itself is already accepted, I think we can make it. Just give me a ping on the PR when it's ready for me to take a look. And just to refresh my memory, all the caching work – both config and metrics -- are already merged, so the only thing remaining is to add (and implement) the new input.buffer.max.bytes config. Does that sound right? > Replace "buffered.records.per.partition" & "cache.max.bytes.buffering" with > "{statestore.cache}/{input.buffer}.max.bytes" > - > > Key: KAFKA-13152 > URL: https://issues.apache.org/jira/browse/KAFKA-13152 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Sagar Rao >Priority: Major > Labels: kip > Fix For: 3.7.0 > > > The current config "buffered.records.per.partition" controls how many records > in maximum to bookkeep, and hence it is exceed we would pause fetching from > this partition. However this config has two issues: > * It's a per-partition config, so the total memory consumed is dependent on > the dynamic number of partitions assigned. > * Record size could vary from case to case. > And hence it's hard to bound the memory usage for this buffering. We should > consider deprecating that config with a global, e.g. "input.buffer.max.bytes" > which controls how much bytes in total is allowed to be buffered. This is > doable since we buffer the raw records in . -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15571) StateRestoreListener#onRestoreSuspended is never called because wrapper DelegatingStateRestoreListener doesn't implement onRestoreSuspended
[ https://issues.apache.org/jira/browse/KAFKA-15571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-15571: --- Fix Version/s: 3.5.2 3.7.0 3.6.1 > StateRestoreListener#onRestoreSuspended is never called because wrapper > DelegatingStateRestoreListener doesn't implement onRestoreSuspended > --- > > Key: KAFKA-15571 > URL: https://issues.apache.org/jira/browse/KAFKA-15571 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.0, 3.6.0, 3.5.1 >Reporter: Levani Kokhreidze >Assignee: Levani Kokhreidze >Priority: Major > Fix For: 3.5.2, 3.7.0, 3.6.1 > > > With https://issues.apache.org/jira/browse/KAFKA-10575 > `StateRestoreListener#onRestoreSuspended` was added. But local tests show > that it is never called because `DelegatingStateRestoreListener` was not > updated to call a new method. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15571) StateRestoreListener#onRestoreSuspended is never called because wrapper DelegatingStateRestoreListener doesn't implement onRestoreSuspended
[ https://issues.apache.org/jira/browse/KAFKA-15571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-15571. Resolution: Fixed > StateRestoreListener#onRestoreSuspended is never called because wrapper > DelegatingStateRestoreListener doesn't implement onRestoreSuspended > --- > > Key: KAFKA-15571 > URL: https://issues.apache.org/jira/browse/KAFKA-15571 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.0, 3.6.0, 3.5.1 >Reporter: Levani Kokhreidze >Assignee: Levani Kokhreidze >Priority: Major > > With https://issues.apache.org/jira/browse/KAFKA-10575 > `StateRestoreListener#onRestoreSuspended` was added. But local tests show > that it is never called because `DelegatingStateRestoreListener` was not > updated to call a new method. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15520) Kafka Streams Stateful Aggregation Rebalancing causing processing to pause on all partitions
[ https://issues.apache.org/jira/browse/KAFKA-15520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17770635#comment-17770635 ] A. Sophie Blee-Goldman commented on KAFKA-15520: {quote}Also added props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getName()); to enable CooperativeStickyAssignor {quote} To clarify, Streams will always override and ignore this setting because it needs to plug in the StreamsPartitionAssignor – so you don't need this, at least not for any Streams applications. It should use cooperative rebalancing by default as of version 2.4 and above. I guess the one thing to watch out for is that cooperative rebalancing will not be enabled if you upgrade from a lower version and forget/skip the step to remove the StreamsConfig.UPGRADE_FROM property. So just make sure that's not being set anywhere (at least, not set to a version that's lower than 2.4) > Kafka Streams Stateful Aggregation Rebalancing causing processing to pause on > all partitions > > > Key: KAFKA-15520 > URL: https://issues.apache.org/jira/browse/KAFKA-15520 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.2 >Reporter: Rohit Bobade >Priority: Major > > Kafka broker version: 2.8.0 Kafka Streams client version: 2.6.2 > I am running kafka streams stateful aggregations on K8s statefulset with > persistent volume attached to each pod. I have also specified > props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, podName); > which makes sure it gets the sticky partition assignment. > Enabled standby replica - > props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); > and set props.put(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, "0"); > However, I'm seeing that when pods restart - it triggers rebalances and > causes processing to be paused on all pods till the rebalance and state > restore is in progress. > My understanding is that even if there is a rebalance - only the partitions > that should be moved around will be restored in a cooperative way and not > pause all the processing. Also, it should failover to standby replica in this > case and avoid state restoring on other pods. > I have increased session timeout to 480 seconds and max poll interval to 15 > mins to minimize rebalances. > Also added > props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, > CooperativeStickyAssignor.class.getName()); > to enable CooperativeStickyAssignor > could someone please help if I'm missing something? > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15463) StreamsException: Accessing from an unknown node
[ https://issues.apache.org/jira/browse/KAFKA-15463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17768141#comment-17768141 ] A. Sophie Blee-Goldman commented on KAFKA-15463: [~yevsh] try moving the KeyValueStore store = context.getStateStore("storeName"); from the #process method to the #init method of your processor > StreamsException: Accessing from an unknown node > - > > Key: KAFKA-15463 > URL: https://issues.apache.org/jira/browse/KAFKA-15463 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.2.1 >Reporter: Yevgeny >Priority: Major > > After some time application was working fine, starting to get: > > This is springboot application runs in kubernetes as stateful pod. > > > > {code:java} > Exception in thread > "-ddf9819f-d6c7-46ce-930e-cd923e1b3c2c-StreamThread-1" > org.apache.kafka.streams.errors.StreamsException: Accessing from an unknown > node at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:162) > at myclass1.java:28) at myclass2.java:48) at > java.base/java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90) at > java.base/java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1602) > at > java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:129) > at > java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:527) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:513) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) > at > java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230) > at > java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReferencePipeline.allMatch(ReferencePipeline.java:637) > at myclass3.java:48) at > org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:49) > at > org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:38) > at > org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:66) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:275) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:254) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:213) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) > at > org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:780) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:809) > at > org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:780) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:711) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:589) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:551) > {code} > > stream-thread > [-ddf9819f-d6c7-46ce-930e-cd923e1b3c2c-StreamThread-1] State > transition from PENDING_SHUTDOWN to DEAD > > > Transformer is Prototype bean, the supplier supplys new instance of the > Transformer: > > > {code:java} > @Override public Transformer> get() > { return ctx.getBean(MyTransformer.class); }{code} > > > The only way to recover is to delete all topics used by kafkastreams, even if > application restarted same exception is thrown. > *If messages in internal topics of 'store-changelog' are deleted/offset > manipulated, can it cause the issue? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14294) Kafka Streams should commit transaction when no records are processed
[ https://issues.apache.org/jira/browse/KAFKA-14294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17760632#comment-17760632 ] A. Sophie Blee-Goldman commented on KAFKA-14294: I'm not sure that workaround would help, as it follows the same mechanism for requesting a commit as the actual invocation of the punctuator, which is what is "broken" by this bug. Specifically, both a punctuator invocation and the `context.commit` API just set the `commitNeeded` flag. However the bug here is that this flag is never checked at all, and we skip over the commit entirely when there are no new input records processed. The only guarantee is really to upgrade to a version with the fix, ie 3.4 or above > Kafka Streams should commit transaction when no records are processed > - > > Key: KAFKA-14294 > URL: https://issues.apache.org/jira/browse/KAFKA-14294 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 3.1.0, 3.2.1 >Reporter: Vicky Papavasileiou >Assignee: A. Sophie Blee-Goldman >Priority: Major > Fix For: 3.4.0 > > Attachments: image-2023-02-08-10-22-20-456.png > > > Currently, if there are no records to process in the input topic, a > transaction does not commit. If a custom punctuator code is writing to a > state store (which is common practice) the producer gets fenced when trying > to write to the changelog topic. This throws a TaskMigratedException and > causes a rebalance. > A better approach would be to commit a transaction even when there are no > records processed as to allow the punctuator to make progress. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14294) Kafka Streams should commit transaction when no records are processed
[ https://issues.apache.org/jira/browse/KAFKA-14294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-14294: --- Affects Version/s: 3.1.0 > Kafka Streams should commit transaction when no records are processed > - > > Key: KAFKA-14294 > URL: https://issues.apache.org/jira/browse/KAFKA-14294 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 3.1.0, 3.2.1 >Reporter: Vicky Papavasileiou >Assignee: A. Sophie Blee-Goldman >Priority: Major > Fix For: 3.4.0 > > Attachments: image-2023-02-08-10-22-20-456.png > > > Currently, if there are no records to process in the input topic, a > transaction does not commit. If a custom punctuator code is writing to a > state store (which is common practice) the producer gets fenced when trying > to write to the changelog topic. This throws a TaskMigratedException and > causes a rebalance. > A better approach would be to commit a transaction even when there are no > records processed as to allow the punctuator to make progress. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15308) Wipe Stores upon OffsetOutOfRangeException in ALOS
[ https://issues.apache.org/jira/browse/KAFKA-15308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman reassigned KAFKA-15308: -- Assignee: Rohan Desai > Wipe Stores upon OffsetOutOfRangeException in ALOS > -- > > Key: KAFKA-15308 > URL: https://issues.apache.org/jira/browse/KAFKA-15308 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.0 >Reporter: Colt McNealy >Assignee: Rohan Desai >Priority: Minor > > As per this [Confluent Community Slack > Thread|https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1690843733272449?thread_ts=1690663361.858559&cid=C48AHTCUQ], > Streams currently does not wipe away RocksDB state upon encountering an > `OffsetOutOfRangeException` in ALOS. > > `OffsetOutOfRangeException` is a rare case that occurs when a standby task > requests offsets that no longer exist in the topic. We should wipe the store > for three reasons: > # Not wiping the store can be a violation of ALOS since some of the > now-missing offsets could have contained tombstone records. > # Wiping the store has no performance cost since we need to replay the > entirety of what's in the changelog topic anyways. > # I have heard (not yet confirmed myself) that we wipe the store in EOS > anyways, so fixing this bug could remove a bit of complexity from supporting > EOS and ALOS. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15297) Cache flush order might not be topological order
[ https://issues.apache.org/jira/browse/KAFKA-15297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17750495#comment-17750495 ] A. Sophie Blee-Goldman commented on KAFKA-15297: Were you able to (re)produce this issue? I'm a bit surprised because I always thought the state stores were maintained in strict topological order, both when building them initially and then when registering them. The stores are flushed in the order that they are registered, which corresponds to the order they are added to the topology, which in turn *should* reflect the topological order of the attached processor nodes. The original ordering comes from InternalTopologyBuilder#build and the topologically-sorted nodeFactories map. This builds up the stateStoreMap in topological order, no? > Cache flush order might not be topological order > - > > Key: KAFKA-15297 > URL: https://issues.apache.org/jira/browse/KAFKA-15297 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.4.0 >Reporter: Bruno Cadonna >Priority: Major > Attachments: minimal_example.png > > > The flush order of the state store caches in Kafka Streams might not > correspond to the topological order of the state stores in the topology. The > order depends on how the processors and state stores are added to the > topology. > In some cases downstream state stores might be flushed before upstream state > stores. That means, that during a commit records in upstream caches might end > up in downstream caches that have already been flushed during the same > commit. If a crash happens at that point, those records in the downstream > caches are lost. Those records are lost for two reasons: > 1. Records in caches are only changelogged after they are flushed from the > cache. However, the downstream caches have already been flushed and they will > not be flushed again during the same commit. > 2. The offsets of the input records that caused the records that now are > blocked in the downstream caches are committed during the same commit and so > they will not be re-processed after the crash. > An example for a topology where the flush order of the caches is wrong is the > following: > {code:java} > final String inputTopic1 = "inputTopic1"; > final String inputTopic2 = "inputTopic2"; > final String outputTopic1 = "outputTopic1"; > final String processorName = "processor1"; > final String stateStoreA = "stateStoreA"; > final String stateStoreB = "stateStoreB"; > final String stateStoreC = "stateStoreC"; > streamsBuilder.stream(inputTopic2, Consumed.with(Serdes.String(), > Serdes.String())) > .process( > () -> new Processor() { > private ProcessorContext context; > @Override > public void init(ProcessorContext context) { > this.context = context; > } > @Override > public void process(Record record) { > context.forward(record); > } > @Override > public void close() {} > }, > Named.as("processor1") > ) > .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String())); > streamsBuilder.stream(inputTopic1, Consumed.with(Serdes.String(), > Serdes.String())) > .toTable(Materialized. byte[]>>as(stateStoreA).withKeySerde(Serdes.String()).withValueSerde(Serdes.String())) > .mapValues(value -> value, Materialized. KeyValueStore byte[]>>as(stateStoreB).withKeySerde(Serdes.String()).withValueSerde(Serdes.String())) > .mapValues(value -> value, Materialized. KeyValueStore byte[]>>as(stateStoreC).withKeySerde(Serdes.String()).withValueSerde(Serdes.String())) > .toStream() > .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String())); > final Topology topology = streamsBuilder.build(streamsConfiguration); > topology.connectProcessorAndStateStores(processorName, stateStoreC); > {code} > This code results in the attached topology. > In the topology {{processor1}} is connected to {{stateStoreC}}. If > {{processor1}} is added to the topology before the other processors, i.e., if > the right branch of the topology is added before the left branch as in the > code above, the cache of {{stateStoreC}} is flushed before the caches of > {{stateStoreA}} and {{stateStoreB}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14976) Left/outer stream-stream joins create KV stores that aren't customizable
[ https://issues.apache.org/jira/browse/KAFKA-14976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman reassigned KAFKA-14976: -- Assignee: Almog Gavra > Left/outer stream-stream joins create KV stores that aren't customizable > > > Key: KAFKA-14976 > URL: https://issues.apache.org/jira/browse/KAFKA-14976 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Almog Gavra >Priority: Major > Labels: needs-kip > > It appears that we only give the illusion of full customizability when it > comes to the state stores of a windowed join. This arose due to an > [optimization|https://github.com/apache/kafka/pull/11252] for the performance > of the spurious results fix, and means that these joins now come with one > additional, and possibly unexpected, state store: > > {code:java} > final StoreBuilder, > LeftOrRightValue>> builder = > new ListValueStoreBuilder<>( > |--[ persistent ? > this--> | Stores.persistentKeyValueStore(storeName) : > |--[ Stores.inMemoryKeyValueStore(storeName), > timestampedKeyAndJoinSideSerde, > leftOrRightValueSerde, > Time.SYSTEM > ); {code} > > where persistent is defined above that as > {code:java} > final boolean persistent = streamJoinedInternal.thisStoreSupplier() == null > || streamJoinedInternal.thisStoreSupplier().get().persistent(); {code} > > This means regardless of whether a custom state store implementation was > passed in to the join, we will still insert one of our RocksDB or InMemory > state stores. Which might be very surprising since the API makes it seem like > the underlying stores are fully configurable. > I'm adding a warning line for this in PR > [#13682|https://github.com/apache/kafka/pull/13682/files#diff-9ce43046fdef1233ab762e728abd1d3d44d7c270b28dcf6b63aa31a93a30af07R334-R336] > but we should really make this hidden state store fully configurable like > the window stores currently are (which will require a KIP) > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14976) Left/outer stream-stream joins create KV stores that aren't customizable
[ https://issues.apache.org/jira/browse/KAFKA-14976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-14976: --- Labels: kip (was: needs-kip) > Left/outer stream-stream joins create KV stores that aren't customizable > > > Key: KAFKA-14976 > URL: https://issues.apache.org/jira/browse/KAFKA-14976 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Almog Gavra >Priority: Major > Labels: kip > > It appears that we only give the illusion of full customizability when it > comes to the state stores of a windowed join. This arose due to an > [optimization|https://github.com/apache/kafka/pull/11252] for the performance > of the spurious results fix, and means that these joins now come with one > additional, and possibly unexpected, state store: > > {code:java} > final StoreBuilder, > LeftOrRightValue>> builder = > new ListValueStoreBuilder<>( > |--[ persistent ? > this--> | Stores.persistentKeyValueStore(storeName) : > |--[ Stores.inMemoryKeyValueStore(storeName), > timestampedKeyAndJoinSideSerde, > leftOrRightValueSerde, > Time.SYSTEM > ); {code} > > where persistent is defined above that as > {code:java} > final boolean persistent = streamJoinedInternal.thisStoreSupplier() == null > || streamJoinedInternal.thisStoreSupplier().get().persistent(); {code} > > This means regardless of whether a custom state store implementation was > passed in to the join, we will still insert one of our RocksDB or InMemory > state stores. Which might be very surprising since the API makes it seem like > the underlying stores are fully configurable. > I'm adding a warning line for this in PR > [#13682|https://github.com/apache/kafka/pull/13682/files#diff-9ce43046fdef1233ab762e728abd1d3d44d7c270b28dcf6b63aa31a93a30af07R334-R336] > but we should really make this hidden state store fully configurable like > the window stores currently are (which will require a KIP) > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14976) Left/outer stream-stream joins create KV stores that aren't customizable
[ https://issues.apache.org/jira/browse/KAFKA-14976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17748314#comment-17748314 ] A. Sophie Blee-Goldman commented on KAFKA-14976: Will be addressed by https://cwiki.apache.org/confluence/display/KAFKA/KIP-954%3A+expand+default+DSL+store+configuration+to+custom+types > Left/outer stream-stream joins create KV stores that aren't customizable > > > Key: KAFKA-14976 > URL: https://issues.apache.org/jira/browse/KAFKA-14976 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Major > Labels: needs-kip > > It appears that we only give the illusion of full customizability when it > comes to the state stores of a windowed join. This arose due to an > [optimization|https://github.com/apache/kafka/pull/11252] for the performance > of the spurious results fix, and means that these joins now come with one > additional, and possibly unexpected, state store: > > {code:java} > final StoreBuilder, > LeftOrRightValue>> builder = > new ListValueStoreBuilder<>( > |--[ persistent ? > this--> | Stores.persistentKeyValueStore(storeName) : > |--[ Stores.inMemoryKeyValueStore(storeName), > timestampedKeyAndJoinSideSerde, > leftOrRightValueSerde, > Time.SYSTEM > ); {code} > > where persistent is defined above that as > {code:java} > final boolean persistent = streamJoinedInternal.thisStoreSupplier() == null > || streamJoinedInternal.thisStoreSupplier().get().persistent(); {code} > > This means regardless of whether a custom state store implementation was > passed in to the join, we will still insert one of our RocksDB or InMemory > state stores. Which might be very surprising since the API makes it seem like > the underlying stores are fully configurable. > I'm adding a warning line for this in PR > [#13682|https://github.com/apache/kafka/pull/13682/files#diff-9ce43046fdef1233ab762e728abd1d3d44d7c270b28dcf6b63aa31a93a30af07R334-R336] > but we should really make this hidden state store fully configurable like > the window stores currently are (which will require a KIP) > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15190) Allow configuring a streams process ID
[ https://issues.apache.org/jira/browse/KAFKA-15190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17746236#comment-17746236 ] A. Sophie Blee-Goldman commented on KAFKA-15190: [~jwreschnig] I'm sure we can find someone to pick up the KIP for a full-fledged fix like I proposed, so no worries – it seems reasonable to me to for us to just reuse the static membership group.instance.id for the time being, if it's set, and punt on the generalized feature for now. Would you be interested in just doing a small PR for this case instead? Happy to review such a thing if so > Allow configuring a streams process ID > -- > > Key: KAFKA-15190 > URL: https://issues.apache.org/jira/browse/KAFKA-15190 > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: Joe Wreschnig >Priority: Major > Labels: needs-kip > > We run our Kafka Streams applications in containers with no persistent > storage, and therefore the mitigation of persisting process ID the state > directly in KAFKA-10716 does not help us avoid shuffling lots of tasks during > restarts. > However, we do have a persistent container ID (from a Kubernetes > StatefulSet). Would it be possible to expose a configuration option to let us > set the streams process ID ourselves? > We are already using this ID as our group.instance.id - would it make sense > to have the process ID be automatically derived from this (plus > application/client IDs) if it's set? The two IDs seem to have overlapping > goals of identifying "this consumer" across restarts. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15215) The default.dsl.store config is not compatible with custom state stores
A. Sophie Blee-Goldman created KAFKA-15215: -- Summary: The default.dsl.store config is not compatible with custom state stores Key: KAFKA-15215 URL: https://issues.apache.org/jira/browse/KAFKA-15215 Project: Kafka Issue Type: New Feature Components: streams Reporter: A. Sophie Blee-Goldman Assignee: Almog Gavra Sort of a bug, sort of a new/missing feature. When we added the long-awaited default.dsl.store config, it was decided to scope the initial KIP to just the two out-of-the-box state stores types offered by Streams, rocksdb and in-memory. The reason being that this would address a large number of the relevant use cases, and could always be followed up with another KIP for custom state stores if/when the demand arose. Of course, since rocksdb is the default anyways, the only beneficiaries of this KIP right now are the people who specifically want only in-memory stores – yet custom state stores users are probably by far the ones with the greatest need for an easier way to configure the store type across an entire application. And unfortunately, because the config currently relies on enum definitions for the known OOTB store types, there's not really any way to extend this feature as it is to work with custom implementations. I think this is a great feature, which is why I hope to see it extended to the broader user base. Most likely we'll want to introduce a new config for this, though whether it replaces the old default.dsl.store config or complements it will have to be decided during the KIP discussion -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15215) The default.dsl.store config is not compatible with custom state stores
[ https://issues.apache.org/jira/browse/KAFKA-15215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-15215: --- Labels: needs-kip (was: ) > The default.dsl.store config is not compatible with custom state stores > --- > > Key: KAFKA-15215 > URL: https://issues.apache.org/jira/browse/KAFKA-15215 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Almog Gavra >Priority: Major > Labels: needs-kip > > Sort of a bug, sort of a new/missing feature. When we added the long-awaited > default.dsl.store config, it was decided to scope the initial KIP to just the > two out-of-the-box state stores types offered by Streams, rocksdb and > in-memory. The reason being that this would address a large number of the > relevant use cases, and could always be followed up with another KIP for > custom state stores if/when the demand arose. > Of course, since rocksdb is the default anyways, the only beneficiaries of > this KIP right now are the people who specifically want only in-memory stores > – yet custom state stores users are probably by far the ones with the > greatest need for an easier way to configure the store type across an entire > application. And unfortunately, because the config currently relies on enum > definitions for the known OOTB store types, there's not really any way to > extend this feature as it is to work with custom implementations. > I think this is a great feature, which is why I hope to see it extended to > the broader user base. Most likely we'll want to introduce a new config for > this, though whether it replaces the old default.dsl.store config or > complements it will have to be decided during the KIP discussion -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15190) Allow configuring a streams process ID
[ https://issues.apache.org/jira/browse/KAFKA-15190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-15190: --- Labels: needs-kip (was: kip) > Allow configuring a streams process ID > -- > > Key: KAFKA-15190 > URL: https://issues.apache.org/jira/browse/KAFKA-15190 > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: Joe Wreschnig >Priority: Major > Labels: needs-kip > > We run our Kafka Streams applications in containers with no persistent > storage, and therefore the mitigation of persisting process ID the state > directly in KAFKA-10716 does not help us avoid shuffling lots of tasks during > restarts. > However, we do have a persistent container ID (from a Kubernetes > StatefulSet). Would it be possible to expose a configuration option to let us > set the streams process ID ourselves? > We are already using this ID as our group.instance.id - would it make sense > to have the process ID be automatically derived from this (plus > application/client IDs) if it's set? The two IDs seem to have overlapping > goals of identifying "this consumer" across restarts. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15190) Allow configuring a streams process ID
[ https://issues.apache.org/jira/browse/KAFKA-15190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-15190: --- Labels: kip (was: ) > Allow configuring a streams process ID > -- > > Key: KAFKA-15190 > URL: https://issues.apache.org/jira/browse/KAFKA-15190 > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: Joe Wreschnig >Priority: Major > Labels: kip > > We run our Kafka Streams applications in containers with no persistent > storage, and therefore the mitigation of persisting process ID the state > directly in KAFKA-10716 does not help us avoid shuffling lots of tasks during > restarts. > However, we do have a persistent container ID (from a Kubernetes > StatefulSet). Would it be possible to expose a configuration option to let us > set the streams process ID ourselves? > We are already using this ID as our group.instance.id - would it make sense > to have the process ID be automatically derived from this (plus > application/client IDs) if it's set? The two IDs seem to have overlapping > goals of identifying "this consumer" across restarts. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15190) Allow configuring a streams process ID
[ https://issues.apache.org/jira/browse/KAFKA-15190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17743313#comment-17743313 ] A. Sophie Blee-Goldman commented on KAFKA-15190: I would be a bit hesitant to overly rely on the group.instance.id because not everyone uses or wants static membership, and that config is completely coupled to the feature. Perhaps we can reuse the group.instance.id as the process id only if/when static membership is already being used, which would not necessarily even require a KIP (maybe), but we'd still need to introduce a new config for the general use case. It's a bummer because of course, practically speaking, this new config would have exactly the same meaning as the group.instance.id – a unique, persistent identifier for each client. It would have been the perfect config for this use case if not for Kafka's habit of being overly clever about reusing configs to enable/disable the related feature, in addition to their actual usage. > Allow configuring a streams process ID > -- > > Key: KAFKA-15190 > URL: https://issues.apache.org/jira/browse/KAFKA-15190 > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: Joe Wreschnig >Priority: Major > > We run our Kafka Streams applications in containers with no persistent > storage, and therefore the mitigation of persisting process ID the state > directly in KAFKA-10716 does not help us avoid shuffling lots of tasks during > restarts. > However, we do have a persistent container ID (from a Kubernetes > StatefulSet). Would it be possible to expose a configuration option to let us > set the streams process ID ourselves? > We are already using this ID as our group.instance.id - would it make sense > to have the process ID be automatically derived from this (plus > application/client IDs) if it's set? The two IDs seem to have overlapping > goals of identifying "this consumer" across restarts. -- This message was sent by Atlassian Jira (v8.20.10#820010)