[jira] [Updated] (KAFKA-12679) Rebalancing a restoring or running task may cause directory livelocking with newly created task
[ https://issues.apache.org/jira/browse/KAFKA-12679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy updated KAFKA-12679: --- Fix Version/s: 4.0.0 (was: 3.8.0) > Rebalancing a restoring or running task may cause directory livelocking with > newly created task > --- > > Key: KAFKA-12679 > URL: https://issues.apache.org/jira/browse/KAFKA-12679 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.1 > Environment: Broker and client version 2.6.1 > Multi-node broker cluster > Multi-node, auto scaling streams app instances >Reporter: Peter Nahas >Assignee: Lucas Brutschy >Priority: Major > Fix For: 4.0.0 > > Attachments: Backoff-between-directory-lock-attempts.patch > > > If a task that uses a state store is in the restoring state or in a running > state and the task gets rebalanced to a separate thread on the same instance, > the newly created task will attempt to lock the state store director while > the first thread is continuing to use it. This is totally normal and expected > behavior when the first thread is not yet aware of the rebalance. However, > that newly created task is effectively running a while loop with no backoff > waiting to lock the directory: > # TaskManager tells the task to restore in `tryToCompleteRestoration` > # The task attempts to lock the directory > # The lock attempt fails and throws a > `org.apache.kafka.streams.errors.LockException` > # TaskManager catches the exception, stops further processing on the task > and reports that not all tasks have restored > # The StreamThread `runLoop` continues to run. > I've seen some documentation indicate that there is supposed to be a backoff > when this condition occurs, but there does not appear to be any in the code. > The result is that if this goes on for long enough, the lock-loop may > dominate CPU usage in the process and starve out the old stream thread task > processing. > > When in this state, the DEBUG level logging for TaskManager will produce a > steady stream of messages like the following: > {noformat} > 2021-03-30 20:59:51,098 DEBUG --- [StreamThread-10] o.a.k.s.p.i.TaskManager > : stream-thread [StreamThread-10] Could not initialize 0_34 due > to the following exception; will retry > org.apache.kafka.streams.errors.LockException: stream-thread > [StreamThread-10] standby-task [0_34] Failed to lock the state directory for > task 0_34 > {noformat} > > > I've attached a git formatted patch to resolve the issue. Simply detect the > scenario and sleep for the backoff time in the appropriate StreamThread. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-12679) Rebalancing a restoring or running task may cause directory livelocking with newly created task
[ https://issues.apache.org/jira/browse/KAFKA-12679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17896322#comment-17896322 ] Lucas Brutschy commented on KAFKA-12679: We haven't seen live-locking with the state updater, but there is a regression to busy waiting on the state directory lock, creating a lot of noise. The fix here: https://github.com/apache/kafka/pull/17209. I set the fix version to 4.0.0 > Rebalancing a restoring or running task may cause directory livelocking with > newly created task > --- > > Key: KAFKA-12679 > URL: https://issues.apache.org/jira/browse/KAFKA-12679 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.1 > Environment: Broker and client version 2.6.1 > Multi-node broker cluster > Multi-node, auto scaling streams app instances >Reporter: Peter Nahas >Assignee: Lucas Brutschy >Priority: Major > Fix For: 4.0.0 > > Attachments: Backoff-between-directory-lock-attempts.patch > > > If a task that uses a state store is in the restoring state or in a running > state and the task gets rebalanced to a separate thread on the same instance, > the newly created task will attempt to lock the state store director while > the first thread is continuing to use it. This is totally normal and expected > behavior when the first thread is not yet aware of the rebalance. However, > that newly created task is effectively running a while loop with no backoff > waiting to lock the directory: > # TaskManager tells the task to restore in `tryToCompleteRestoration` > # The task attempts to lock the directory > # The lock attempt fails and throws a > `org.apache.kafka.streams.errors.LockException` > # TaskManager catches the exception, stops further processing on the task > and reports that not all tasks have restored > # The StreamThread `runLoop` continues to run. > I've seen some documentation indicate that there is supposed to be a backoff > when this condition occurs, but there does not appear to be any in the code. > The result is that if this goes on for long enough, the lock-loop may > dominate CPU usage in the process and starve out the old stream thread task > processing. > > When in this state, the DEBUG level logging for TaskManager will produce a > steady stream of messages like the following: > {noformat} > 2021-03-30 20:59:51,098 DEBUG --- [StreamThread-10] o.a.k.s.p.i.TaskManager > : stream-thread [StreamThread-10] Could not initialize 0_34 due > to the following exception; will retry > org.apache.kafka.streams.errors.LockException: stream-thread > [StreamThread-10] standby-task [0_34] Failed to lock the state directory for > task 0_34 > {noformat} > > > I've attached a git formatted patch to resolve the issue. Simply detect the > scenario and sleep for the backoff time in the appropriate StreamThread. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16758) Extend Consumer#close with option to leave the group or not
[ https://issues.apache.org/jira/browse/KAFKA-16758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878220#comment-17878220 ] Lucas Brutschy edited comment on KAFKA-16758 at 8/30/24 6:19 PM: - Hey [~lianetm], are you still planning to do this? was (Author: JIRAUSER302322): Hey lianet, are you still planning to do this? > Extend Consumer#close with option to leave the group or not > --- > > Key: KAFKA-16758 > URL: https://issues.apache.org/jira/browse/KAFKA-16758 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Reporter: A. Sophie Blee-Goldman >Assignee: Lianet Magrans >Priority: Major > Labels: needs-kip > Fix For: 4.0.0 > > > See comments on https://issues.apache.org/jira/browse/KAFKA-16514 for the > full context. > Essentially we would get rid of the "internal.leave.group.on.close" config > that is used as a backdoor by Kafka Streams right now to prevent closed > consumers from leaving the group, thus reducing unnecessary task movements > after a simple bounce. > This would be replaced by an actual public API that would allow the caller to > opt in or out to the LeaveGroup when close is called. This would be similar > to the KafkaStreams#close(CloseOptions) API, and in fact would be how that > API will be implemented (since it only works for static groups at the moment > as noted in KAFKA-16514 ) > This has several benefits over the current situation: > # It allows plain consumer apps to opt-out of leaving the group when closed, > which is currently not possible through any public API (only an internal > backdoor config) > # It enables the caller to dynamically select the appropriate action > depending on why the client is being closed – for example, you would not want > the consumer to leave the group during a simple restart, but would want it to > leave the group when shutting down the app or if scaling down the node. This > is not possible today, even with the internal config, since configs are > immutable > # It can be leveraged to "fix" the KafkaStreams#close(closeOptions) API so > that the user's choice to leave the group during close will be respected for > non-static members -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16758) Extend Consumer#close with option to leave the group or not
[ https://issues.apache.org/jira/browse/KAFKA-16758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878220#comment-17878220 ] Lucas Brutschy commented on KAFKA-16758: Hey lianet, are you still planning to do this? > Extend Consumer#close with option to leave the group or not > --- > > Key: KAFKA-16758 > URL: https://issues.apache.org/jira/browse/KAFKA-16758 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Reporter: A. Sophie Blee-Goldman >Assignee: Lianet Magrans >Priority: Major > Labels: needs-kip > Fix For: 4.0.0 > > > See comments on https://issues.apache.org/jira/browse/KAFKA-16514 for the > full context. > Essentially we would get rid of the "internal.leave.group.on.close" config > that is used as a backdoor by Kafka Streams right now to prevent closed > consumers from leaving the group, thus reducing unnecessary task movements > after a simple bounce. > This would be replaced by an actual public API that would allow the caller to > opt in or out to the LeaveGroup when close is called. This would be similar > to the KafkaStreams#close(CloseOptions) API, and in fact would be how that > API will be implemented (since it only works for static groups at the moment > as noted in KAFKA-16514 ) > This has several benefits over the current situation: > # It allows plain consumer apps to opt-out of leaving the group when closed, > which is currently not possible through any public API (only an internal > backdoor config) > # It enables the caller to dynamically select the appropriate action > depending on why the client is being closed – for example, you would not want > the consumer to leave the group during a simple restart, but would want it to > leave the group when shutting down the app or if scaling down the node. This > is not possible today, even with the internal config, since configs are > immutable > # It can be leveraged to "fix" the KafkaStreams#close(closeOptions) API so > that the user's choice to leave the group during close will be respected for > non-static members -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16336) Remove Deprecated metric standby-process-ratio
[ https://issues.apache.org/jira/browse/KAFKA-16336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy updated KAFKA-16336: --- Fix Version/s: (was: 4.0.0) > Remove Deprecated metric standby-process-ratio > -- > > Key: KAFKA-16336 > URL: https://issues.apache.org/jira/browse/KAFKA-16336 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Matthias J. Sax >Assignee: TengYao Chi >Priority: Blocker > > Metric "standby-process-ratio" was deprecated in 3.5 release via > https://cwiki.apache.org/confluence/display/KAFKA/KIP-869%3A+Improve+Streams+State+Restoration+Visibility -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17229) Multiple punctuators that together exceed the transaction timeout cause ProducerFencedException
[ https://issues.apache.org/jira/browse/KAFKA-17229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17876628#comment-17876628 ] Lucas Brutschy commented on KAFKA-17229: Yes. This should be fixed in the non-processing thread flow. If this is fixed by the processing thread code (which is very possible, but I'm not sure myself since it has been a while), it was more accidental since we weren't aware that this specific issue exists in the current code path. It would be great, when fixing this in the current code path, to also include a small integration test for this case to make sure that we don't break this behavior once we start working on the processing threads again. > Multiple punctuators that together exceed the transaction timeout cause > ProducerFencedException > --- > > Key: KAFKA-17229 > URL: https://issues.apache.org/jira/browse/KAFKA-17229 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.8.0 >Reporter: Trevan Richins >Priority: Major > Attachments: always-forward-failure.log, topic-input-failure.log > > > If a single StreamThread has multiple punctuators tasks and the sum total of > them exceeds the transaction timeout setting, ProducerFencedExceptions will > occur. > For example, in my test case, I have a input topic with 10 partitions, a > processor with a punctuator that just sleeps for 5 seconds (the transaction > timeout is 10s so it finishes within the timeout), and an output topic. The > punctuators run every 30 seconds (wall clock). Once the app is running and > is inside one of the punctuators, I put one record in the input topic. The > punctuators will all finish and the record will be seen and read but it won't > commit because the punctuators run again (since it has been 30s since they > last started). After the punctuators finish this second time, it will try to > commit the transaction that it started 50 seconds ago and will trigger the > ProducerFencedException. > Another test case, with the same scenario, is having the punctuators forward > something. This also causes a ProducerFencedException because the first > punctuator starts a transaction but it doesn't commit the transaction till > all of the punctuators are done and that is long after the transaction > timeout. > The issue doesn't exist if there is only one partition as the single > punctuator will finish within the transaction timeout. It is only when there > are multiple punctuators that exceed the transaction timeout in total. > It feels like what is needed is for kafka to check after each punctuator if > there is data that needs to be committed. If there is, it commits then. > > I've attached a log of the first test case. It is called > "topic-input-failure.log". It starts after the punctuators run the first > time. It shows the record being received and the transaction starting. Then > it runs the punctuators again and they each sleep for 5 seconds. Once they > are done, it triggers a ProducerFencedException. > I've attached a log for the second test case. It is called > "always-forward-failure.log". It starts when the punctuators run the first > time. It shows the punctuators forwarding a record and sleeping for 5 > seconds. In this case, only 5 punctuators run as a group. An > InvalidProducerEpochException occurs after the 5th punctuator finishes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16567) Add New Stream Metrics based on KIP-869
[ https://issues.apache.org/jira/browse/KAFKA-16567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy resolved KAFKA-16567. Resolution: Fixed > Add New Stream Metrics based on KIP-869 > --- > > Key: KAFKA-16567 > URL: https://issues.apache.org/jira/browse/KAFKA-16567 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: Walter Hernandez >Priority: Major > Labels: kip > > Add the following metrics to the state updater: > * restore-total > * restore-rate > * update-total > * update-rate > * restore-remaining-records-total > Please see the KIP for more information about the metrics. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16567) Add New Stream Metrics based on KIP-869
[ https://issues.apache.org/jira/browse/KAFKA-16567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17876618#comment-17876618 ] Lucas Brutschy commented on KAFKA-16567: [~balajirrao] I think you are right, this is already implemented. Thanks for digging out the corresponding PR. I will close this ticket. If there is anything missing, we should open a separate ticket, but from what I can see, this is indeed completely done already. > Add New Stream Metrics based on KIP-869 > --- > > Key: KAFKA-16567 > URL: https://issues.apache.org/jira/browse/KAFKA-16567 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: Walter Hernandez >Priority: Major > Labels: kip > > Add the following metrics to the state updater: > * restore-total > * restore-rate > * update-total > * update-rate > * restore-remaining-records-total > Please see the KIP for more information about the metrics. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16290) Investigate propagating subscription state updates via queues
[ https://issues.apache.org/jira/browse/KAFKA-16290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17873966#comment-17873966 ] Lucas Brutschy commented on KAFKA-16290: Nice! > Investigate propagating subscription state updates via queues > - > > Key: KAFKA-16290 > URL: https://issues.apache.org/jira/browse/KAFKA-16290 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lucas Brutschy >Assignee: Lianet Magrans >Priority: Critical > Labels: consumer-threading-refactor, kip-848, > kip-848-client-support > Fix For: 4.0.0 > > > We are mostly using the queues for interaction between application thread and > background thread, but the subscription object is shared between the threads, > and it is updated directly without going through the queues. > The way we allow updates to the subscription state from both threads is > definitely not right, and will bring trouble. Places like the assign() is > probably the most obvious, where we send an event to the background to > commit, but then update the subscription in the foreground right away. > It seems sensible to aim for having all updates to the subscription state in > the background, triggered from the app thread via events (and I think we > already have related events for all updates, just that the subscription state > was left out in the app thread). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17299) Kafka streams consumer stops consumption
[ https://issues.apache.org/jira/browse/KAFKA-17299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17872077#comment-17872077 ] Lucas Brutschy commented on KAFKA-17299: Hey. The change looks safe to me. However, it would be good to understand why the deserialization error causes the consumer to not resume. Have you been able to determine the problem in the record accounting? > Kafka streams consumer stops consumption > > > Key: KAFKA-17299 > URL: https://issues.apache.org/jira/browse/KAFKA-17299 > Project: Kafka > Issue Type: Bug > Components: consumer, streams >Affects Versions: 3.6.2, 3.8.0, 3.7.1 >Reporter: Laxman >Priority: Major > > We are using kafka clients version (3.5.2). However, the bug looks to exist > in current version as well from our code review. > In one of our clusters, kafka streams consumption completely stops and > doesn't recover even after restart of the consumer instance/pod. These are > our findings/observations from our debugging. > * We have some deserialisation errors while the streams consuming. > * We configured LogAndContinueExceptionHandler as exception handler to > handle deserialisation failures. > * Streams consumption doesn't stop on every deserialisation failure/error. > * We are noticing the consumption hangs, only when the first message in the > current batch is faulty and fails to deserialise. > We did a thorough inspection of the kafka clients code and debugged by > patching with additional logs, we found the following lines of code from > StreamTask.java seems to be the issue. > *Original* - Original code > [StreamTask.java|https://github.com/apache/kafka/blob/3.5.2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L750] > {code:java} > // after processing this record, if its partition queue's > buffered size has been > // decreased to the threshold, we can then resume the consumption > on this partition > if (recordInfo.queue().size() == maxBufferedSize) { > mainConsumer.resume(singleton(partition)); > } > {code} > > *Patched* - Issue resolved after this fix for us. > {code:java} > // after processing this record, if its partition queue's > buffered size has been > // decreased to the threshold, we can then resume the consumption > on this partition > if (recordInfo.queue().size() <= maxBufferedSize) { > mainConsumer.resume(singleton(partition)); > } > {code} > > We are resuming consumption only when queue size is exactly matching max > buffered size. I think some record accounting has gone wrong especially when > there is an issue with deserialising the first record in the batch. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17125) Streams Rebalance Protocol
Lucas Brutschy created KAFKA-17125: -- Summary: Streams Rebalance Protocol Key: KAFKA-17125 URL: https://issues.apache.org/jira/browse/KAFKA-17125 Project: Kafka Issue Type: Task Components: streams Reporter: Lucas Brutschy This ticket tracks the implementation of the Streams Rebalance Protocol, as proposed in https://cwiki.apache.org/confluence/display/KAFKA/KIP-1071%3A+Streams+Rebalance+Protocol -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16599) LegacyConsumer: Always await async commit callbacks in commitSync and close
[ https://issues.apache.org/jira/browse/KAFKA-16599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy updated KAFKA-16599: --- Fix Version/s: (was: 3.8.0) > LegacyConsumer: Always await async commit callbacks in commitSync and close > --- > > Key: KAFKA-16599 > URL: https://issues.apache.org/jira/browse/KAFKA-16599 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Major > > The javadoc for KafkaConsumer.commitSync says: > {code:java} > Note that asynchronous offset commits sent previously with the {@link > #commitAsync(OffsetCommitCallback)} > (or similar) are guaranteed to have their callbacks invoked prior to > completion of this method. > {code} > This is not always true in the legacy consumer, when the set of offsets is > empty, the execution of the commit callback is not always awaited. There are > also various races possible that can avoid callback handler execution. > Similarly, there is code in the legacy consumer to await the completion of > the commit callback before closing, however, the code doesn't cover all cases > and the behavior is therefore inconsistent. While the Javadoc doesn't > explicitly promise callback execution, it promises "completing commits", > which one would reasonably expect to include callback execution. Either way, > the current behavior of the legacy consumer is inconsistent. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-12679) Rebalancing a restoring or running task may cause directory livelocking with newly created task
[ https://issues.apache.org/jira/browse/KAFKA-12679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17847334#comment-17847334 ] Lucas Brutschy edited comment on KAFKA-12679 at 5/17/24 2:26 PM: - Hey [~stoeckmk]. It is true that the state updater was not enabled in 3.7 in the end, so there is no back-off when locking the state directory. We missed updating the fix version on this ticket. I updated it now. Hey [~coltmcnealy-lh]. The change I implemented was to back-off in case of a lock error, but essentially retry. You should see something like `"Encountered lock exception. Reattempting locking the state in the next iteration.` If you are completely stuck and the application makes zero progress, this does not seem like it would be solved by a back-off, and somehow that seems to describe a different problem, especially if it also happens with state updater enabled. We may want to create a separate ticket for this, since the problem described in this ticket should resolve itself once the old thread releases the lock on the state directory. was (Author: JIRAUSER302322): Hey [~coltmcnealy-lh]. It is true that the state updater was not enabled in 3.7 in the end, so there is no back-off when locking the state directory. The change I implemented was to back-off in case of a lock error, to not end up in a busy loop trying to acquire the lock. If you are completely stuck and the application makes zero progress, this does not seem like it would be solved by a back-off, and somehow that seems to describe a different problem, especially if it also happens with state updater enabled. We may want to create a separate ticket for this, since the problem described in this ticket should resolve itself once the old thread releases the lock on the state directory. Either way, I'll update the fix version here. > Rebalancing a restoring or running task may cause directory livelocking with > newly created task > --- > > Key: KAFKA-12679 > URL: https://issues.apache.org/jira/browse/KAFKA-12679 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.1 > Environment: Broker and client version 2.6.1 > Multi-node broker cluster > Multi-node, auto scaling streams app instances >Reporter: Peter Nahas >Assignee: Lucas Brutschy >Priority: Major > Fix For: 3.8.0 > > Attachments: Backoff-between-directory-lock-attempts.patch > > > If a task that uses a state store is in the restoring state or in a running > state and the task gets rebalanced to a separate thread on the same instance, > the newly created task will attempt to lock the state store director while > the first thread is continuing to use it. This is totally normal and expected > behavior when the first thread is not yet aware of the rebalance. However, > that newly created task is effectively running a while loop with no backoff > waiting to lock the directory: > # TaskManager tells the task to restore in `tryToCompleteRestoration` > # The task attempts to lock the directory > # The lock attempt fails and throws a > `org.apache.kafka.streams.errors.LockException` > # TaskManager catches the exception, stops further processing on the task > and reports that not all tasks have restored > # The StreamThread `runLoop` continues to run. > I've seen some documentation indicate that there is supposed to be a backoff > when this condition occurs, but there does not appear to be any in the code. > The result is that if this goes on for long enough, the lock-loop may > dominate CPU usage in the process and starve out the old stream thread task > processing. > > When in this state, the DEBUG level logging for TaskManager will produce a > steady stream of messages like the following: > {noformat} > 2021-03-30 20:59:51,098 DEBUG --- [StreamThread-10] o.a.k.s.p.i.TaskManager > : stream-thread [StreamThread-10] Could not initialize 0_34 due > to the following exception; will retry > org.apache.kafka.streams.errors.LockException: stream-thread > [StreamThread-10] standby-task [0_34] Failed to lock the state directory for > task 0_34 > {noformat} > > > I've attached a git formatted patch to resolve the issue. Simply detect the > scenario and sleep for the backoff time in the appropriate StreamThread. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-12679) Rebalancing a restoring or running task may cause directory livelocking with newly created task
[ https://issues.apache.org/jira/browse/KAFKA-12679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17847334#comment-17847334 ] Lucas Brutschy edited comment on KAFKA-12679 at 5/17/24 2:20 PM: - Hey [~coltmcnealy-lh]. It is true that the state updater was not enabled in 3.7 in the end, so there is no back-off when locking the state directory. The change I implemented was to back-off in case of a lock error, to not end up in a busy loop trying to acquire the lock. If you are completely stuck and the application makes zero progress, this does not seem like it would be solved by a back-off, and somehow that seems to describe a different problem, especially if it also happens with state updater enabled. We may want to create a separate ticket for this, since the problem described in this ticket should resolve itself once the old thread releases the lock on the state directory. Either way, I'll update the fix version here. was (Author: JIRAUSER302322): Hey [~coltmcnealy-lh]. It is true that the state updater was not enabled in 3.7 in the end, so there is no back-off when locking the state directory. The change I implemented was to back-off in case of a lock error, to not end up in a busy loop trying to acquire the lock. If you are completely stuck and the application makes zero progress, this does not seem like it would be solved by a back-off, and somehow that seems to describe a different problem, especially if it also happens with state updater enabled. We may want to create a separate ticket for this.Either way, I'll update the fix version here. > Rebalancing a restoring or running task may cause directory livelocking with > newly created task > --- > > Key: KAFKA-12679 > URL: https://issues.apache.org/jira/browse/KAFKA-12679 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.1 > Environment: Broker and client version 2.6.1 > Multi-node broker cluster > Multi-node, auto scaling streams app instances >Reporter: Peter Nahas >Assignee: Lucas Brutschy >Priority: Major > Fix For: 3.8.0 > > Attachments: Backoff-between-directory-lock-attempts.patch > > > If a task that uses a state store is in the restoring state or in a running > state and the task gets rebalanced to a separate thread on the same instance, > the newly created task will attempt to lock the state store director while > the first thread is continuing to use it. This is totally normal and expected > behavior when the first thread is not yet aware of the rebalance. However, > that newly created task is effectively running a while loop with no backoff > waiting to lock the directory: > # TaskManager tells the task to restore in `tryToCompleteRestoration` > # The task attempts to lock the directory > # The lock attempt fails and throws a > `org.apache.kafka.streams.errors.LockException` > # TaskManager catches the exception, stops further processing on the task > and reports that not all tasks have restored > # The StreamThread `runLoop` continues to run. > I've seen some documentation indicate that there is supposed to be a backoff > when this condition occurs, but there does not appear to be any in the code. > The result is that if this goes on for long enough, the lock-loop may > dominate CPU usage in the process and starve out the old stream thread task > processing. > > When in this state, the DEBUG level logging for TaskManager will produce a > steady stream of messages like the following: > {noformat} > 2021-03-30 20:59:51,098 DEBUG --- [StreamThread-10] o.a.k.s.p.i.TaskManager > : stream-thread [StreamThread-10] Could not initialize 0_34 due > to the following exception; will retry > org.apache.kafka.streams.errors.LockException: stream-thread > [StreamThread-10] standby-task [0_34] Failed to lock the state directory for > task 0_34 > {noformat} > > > I've attached a git formatted patch to resolve the issue. Simply detect the > scenario and sleep for the backoff time in the appropriate StreamThread. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-12679) Rebalancing a restoring or running task may cause directory livelocking with newly created task
[ https://issues.apache.org/jira/browse/KAFKA-12679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy updated KAFKA-12679: --- Fix Version/s: 3.8.0 (was: 3.7.0) > Rebalancing a restoring or running task may cause directory livelocking with > newly created task > --- > > Key: KAFKA-12679 > URL: https://issues.apache.org/jira/browse/KAFKA-12679 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.1 > Environment: Broker and client version 2.6.1 > Multi-node broker cluster > Multi-node, auto scaling streams app instances >Reporter: Peter Nahas >Assignee: Lucas Brutschy >Priority: Major > Fix For: 3.8.0 > > Attachments: Backoff-between-directory-lock-attempts.patch > > > If a task that uses a state store is in the restoring state or in a running > state and the task gets rebalanced to a separate thread on the same instance, > the newly created task will attempt to lock the state store director while > the first thread is continuing to use it. This is totally normal and expected > behavior when the first thread is not yet aware of the rebalance. However, > that newly created task is effectively running a while loop with no backoff > waiting to lock the directory: > # TaskManager tells the task to restore in `tryToCompleteRestoration` > # The task attempts to lock the directory > # The lock attempt fails and throws a > `org.apache.kafka.streams.errors.LockException` > # TaskManager catches the exception, stops further processing on the task > and reports that not all tasks have restored > # The StreamThread `runLoop` continues to run. > I've seen some documentation indicate that there is supposed to be a backoff > when this condition occurs, but there does not appear to be any in the code. > The result is that if this goes on for long enough, the lock-loop may > dominate CPU usage in the process and starve out the old stream thread task > processing. > > When in this state, the DEBUG level logging for TaskManager will produce a > steady stream of messages like the following: > {noformat} > 2021-03-30 20:59:51,098 DEBUG --- [StreamThread-10] o.a.k.s.p.i.TaskManager > : stream-thread [StreamThread-10] Could not initialize 0_34 due > to the following exception; will retry > org.apache.kafka.streams.errors.LockException: stream-thread > [StreamThread-10] standby-task [0_34] Failed to lock the state directory for > task 0_34 > {noformat} > > > I've attached a git formatted patch to resolve the issue. Simply detect the > scenario and sleep for the backoff time in the appropriate StreamThread. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-12679) Rebalancing a restoring or running task may cause directory livelocking with newly created task
[ https://issues.apache.org/jira/browse/KAFKA-12679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17847334#comment-17847334 ] Lucas Brutschy commented on KAFKA-12679: Hey [~coltmcnealy-lh]. It is true that the state updater was not enabled in 3.7 in the end, so there is no back-off when locking the state directory. The change I implemented was to back-off in case of a lock error, to not end up in a busy loop trying to acquire the lock. If you are completely stuck and the application makes zero progress, this does not seem like it would be solved by a back-off, and somehow that seems to describe a different problem, especially if it also happens with state updater enabled. We may want to create a separate ticket for this.Either way, I'll update the fix version here. > Rebalancing a restoring or running task may cause directory livelocking with > newly created task > --- > > Key: KAFKA-12679 > URL: https://issues.apache.org/jira/browse/KAFKA-12679 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.1 > Environment: Broker and client version 2.6.1 > Multi-node broker cluster > Multi-node, auto scaling streams app instances >Reporter: Peter Nahas >Assignee: Lucas Brutschy >Priority: Major > Fix For: 3.7.0 > > Attachments: Backoff-between-directory-lock-attempts.patch > > > If a task that uses a state store is in the restoring state or in a running > state and the task gets rebalanced to a separate thread on the same instance, > the newly created task will attempt to lock the state store director while > the first thread is continuing to use it. This is totally normal and expected > behavior when the first thread is not yet aware of the rebalance. However, > that newly created task is effectively running a while loop with no backoff > waiting to lock the directory: > # TaskManager tells the task to restore in `tryToCompleteRestoration` > # The task attempts to lock the directory > # The lock attempt fails and throws a > `org.apache.kafka.streams.errors.LockException` > # TaskManager catches the exception, stops further processing on the task > and reports that not all tasks have restored > # The StreamThread `runLoop` continues to run. > I've seen some documentation indicate that there is supposed to be a backoff > when this condition occurs, but there does not appear to be any in the code. > The result is that if this goes on for long enough, the lock-loop may > dominate CPU usage in the process and starve out the old stream thread task > processing. > > When in this state, the DEBUG level logging for TaskManager will produce a > steady stream of messages like the following: > {noformat} > 2021-03-30 20:59:51,098 DEBUG --- [StreamThread-10] o.a.k.s.p.i.TaskManager > : stream-thread [StreamThread-10] Could not initialize 0_34 due > to the following exception; will retry > org.apache.kafka.streams.errors.LockException: stream-thread > [StreamThread-10] standby-task [0_34] Failed to lock the state directory for > task 0_34 > {noformat} > > > I've attached a git formatted patch to resolve the issue. Simply detect the > scenario and sleep for the backoff time in the appropriate StreamThread. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16103) Review client logic for triggering offset commit callbacks
[ https://issues.apache.org/jira/browse/KAFKA-16103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy resolved KAFKA-16103. Resolution: Fixed > Review client logic for triggering offset commit callbacks > -- > > Key: KAFKA-16103 > URL: https://issues.apache.org/jira/browse/KAFKA-16103 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lucas Brutschy >Priority: Critical > Labels: kip-848-client-support, offset > Fix For: 3.8.0 > > > Review logic for triggering commit callbacks, ensuring that all callbacks are > triggered before returning from commitSync -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16103) Review client logic for triggering offset commit callbacks
[ https://issues.apache.org/jira/browse/KAFKA-16103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839765#comment-17839765 ] Lucas Brutschy commented on KAFKA-16103: changes in legacy consumer in https://issues.apache.org/jira/browse/KAFKA-16599 > Review client logic for triggering offset commit callbacks > -- > > Key: KAFKA-16103 > URL: https://issues.apache.org/jira/browse/KAFKA-16103 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lucas Brutschy >Priority: Critical > Labels: kip-848-client-support, offset > Fix For: 3.8.0 > > > Review logic for triggering commit callbacks, ensuring that all callbacks are > triggered before returning from commitSync -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16599) LegacyConsumer: Always await async commit callbacks in commitSync and close
[ https://issues.apache.org/jira/browse/KAFKA-16599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy updated KAFKA-16599: --- Summary: LegacyConsumer: Always await async commit callbacks in commitSync and close (was: Always await async commit callbacks in commitSync and close) > LegacyConsumer: Always await async commit callbacks in commitSync and close > --- > > Key: KAFKA-16599 > URL: https://issues.apache.org/jira/browse/KAFKA-16599 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Major > > The javadoc for KafkaConsumer.commitSync says: > {code:java} > Note that asynchronous offset commits sent previously with the {@link > #commitAsync(OffsetCommitCallback)} > (or similar) are guaranteed to have their callbacks invoked prior to > completion of this method. > {code} > This is not always true in the legacy consumer, when the set of offsets is > empty, the execution of the commit callback is not always awaited. There are > also various races possible that can avoid callback handler execution. > Similarly, there is code in the legacy consumer to await the completion of > the commit callback before closing, however, the code doesn't cover all cases > and the behavior is therefore inconsistent. While the Javadoc doesn't > explicitly promise callback execution, it promises "completing commits", > which one would reasonably expect to include callback execution. Either way, > the current behavior of the legacy consumer is inconsistent. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16599) Always await async commit callbacks in commitSync and close
[ https://issues.apache.org/jira/browse/KAFKA-16599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy updated KAFKA-16599: --- Component/s: clients consumer > Always await async commit callbacks in commitSync and close > --- > > Key: KAFKA-16599 > URL: https://issues.apache.org/jira/browse/KAFKA-16599 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Major > > The javadoc for KafkaConsumer.commitSync says: > {code:java} > Note that asynchronous offset commits sent previously with the {@link > #commitAsync(OffsetCommitCallback)} > (or similar) are guaranteed to have their callbacks invoked prior to > completion of this method. > {code} > This is not always true in the legacy consumer, when the set of offsets is > empty, the execution of the commit callback is not always awaited. There are > also various races possible that can avoid callback handler execution. > Similarly, there is code in the legacy consumer to await the completion of > the commit callback before closing, however, the code doesn't cover all cases > and the behavior is therefore inconsistent. While the Javadoc doesn't > explicitly promise callback execution, it promises "completing commits", > which one would reasonably expect to include callback execution. Either way, > the current behavior of the legacy consumer is inconsistent. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16599) Always await async commit callbacks in commitSync and close
[ https://issues.apache.org/jira/browse/KAFKA-16599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy updated KAFKA-16599: --- Description: The javadoc for KafkaConsumer.commitSync says: {code:java} Note that asynchronous offset commits sent previously with the {@link #commitAsync(OffsetCommitCallback)} (or similar) are guaranteed to have their callbacks invoked prior to completion of this method. {code} This is not always true in the legacy consumer, when the set of offsets is empty, the execution of the commit callback is not always awaited. There are also various races possible that can avoid callback handler execution. Similarly, there is code in the legacy consumer to await the completion of the commit callback before closing, however, the code doesn't cover all cases and the behavior is therefore inconsistent. While the Javadoc doesn't explicitly promise callback execution, it promises "completing commits", which one would reasonably expect to include callback execution. Either way, the current behavior of the legacy consumer is inconsistent. > Always await async commit callbacks in commitSync and close > --- > > Key: KAFKA-16599 > URL: https://issues.apache.org/jira/browse/KAFKA-16599 > Project: Kafka > Issue Type: Task >Reporter: Lucas Brutschy >Priority: Major > > The javadoc for KafkaConsumer.commitSync says: > {code:java} > Note that asynchronous offset commits sent previously with the {@link > #commitAsync(OffsetCommitCallback)} > (or similar) are guaranteed to have their callbacks invoked prior to > completion of this method. > {code} > This is not always true in the legacy consumer, when the set of offsets is > empty, the execution of the commit callback is not always awaited. There are > also various races possible that can avoid callback handler execution. > Similarly, there is code in the legacy consumer to await the completion of > the commit callback before closing, however, the code doesn't cover all cases > and the behavior is therefore inconsistent. While the Javadoc doesn't > explicitly promise callback execution, it promises "completing commits", > which one would reasonably expect to include callback execution. Either way, > the current behavior of the legacy consumer is inconsistent. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16599) Always await async commit callbacks in commitSync and close
[ https://issues.apache.org/jira/browse/KAFKA-16599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-16599: -- Assignee: Lucas Brutschy > Always await async commit callbacks in commitSync and close > --- > > Key: KAFKA-16599 > URL: https://issues.apache.org/jira/browse/KAFKA-16599 > Project: Kafka > Issue Type: Task >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Major > > The javadoc for KafkaConsumer.commitSync says: > {code:java} > Note that asynchronous offset commits sent previously with the {@link > #commitAsync(OffsetCommitCallback)} > (or similar) are guaranteed to have their callbacks invoked prior to > completion of this method. > {code} > This is not always true in the legacy consumer, when the set of offsets is > empty, the execution of the commit callback is not always awaited. There are > also various races possible that can avoid callback handler execution. > Similarly, there is code in the legacy consumer to await the completion of > the commit callback before closing, however, the code doesn't cover all cases > and the behavior is therefore inconsistent. While the Javadoc doesn't > explicitly promise callback execution, it promises "completing commits", > which one would reasonably expect to include callback execution. Either way, > the current behavior of the legacy consumer is inconsistent. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16599) Always await async commit callbacks in commitSync and close
Lucas Brutschy created KAFKA-16599: -- Summary: Always await async commit callbacks in commitSync and close Key: KAFKA-16599 URL: https://issues.apache.org/jira/browse/KAFKA-16599 Project: Kafka Issue Type: Task Reporter: Lucas Brutschy -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16298) Ensure user callbacks exceptions are propagated to the user on consumer poll
[ https://issues.apache.org/jira/browse/KAFKA-16298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-16298: -- Assignee: Lucas Brutschy (was: Kirk True) > Ensure user callbacks exceptions are propagated to the user on consumer poll > > > Key: KAFKA-16298 > URL: https://issues.apache.org/jira/browse/KAFKA-16298 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Assignee: Lucas Brutschy >Priority: Blocker > Labels: callback, kip-848-client-support > Fix For: 3.8.0 > > > When user-defined callbacks fail with an exception, the expectation is that > the error should be propagated to the user as a KafkaExpception and break the > poll loop (behaviour in the legacy coordinator). The new consumer executes > callbacks in the application thread, and sends an event to the background > with the callback result and error if any, [passing the error along with the > event > here|https://github.com/apache/kafka/blob/98a658f871fc2c533b16fb5fd567a5ceb1c340b7/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1882] > to the background thread, but does not seem to propagate the exception to > the user. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16111) Implement tests for tricky rebalance callback scenarios
[ https://issues.apache.org/jira/browse/KAFKA-16111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-16111: -- Assignee: (was: Lucas Brutschy) > Implement tests for tricky rebalance callback scenarios > --- > > Key: KAFKA-16111 > URL: https://issues.apache.org/jira/browse/KAFKA-16111 > Project: Kafka > Issue Type: Test > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Priority: Major > Labels: callback, consumer-threading-refactor, integration-tests > Fix For: 3.8.0 > > > There is justified concern that the new threading model may not play well > with "tricky" {{ConsumerRebalanceListener}} callbacks. We need to provide > some assurance that it will support complicated patterns. > # Design and implement test scenarios > # Update and document any design changes with the callback sub-system where > needed > # Provide fix(es) to the {{AsyncKafkaConsumer}} implementation to abide by > said design -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16103) Review client logic for triggering offset commit callbacks
[ https://issues.apache.org/jira/browse/KAFKA-16103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-16103: -- Assignee: Lucas Brutschy (was: Lianet Magrans) > Review client logic for triggering offset commit callbacks > -- > > Key: KAFKA-16103 > URL: https://issues.apache.org/jira/browse/KAFKA-16103 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lucas Brutschy >Priority: Critical > Labels: kip-848-client-support, offset > Fix For: 3.8.0 > > > Review logic for triggering commit callbacks, ensuring that all callbacks are > triggered before returning from commitSync -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16275) Update kraft_upgrade_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17829893#comment-17829893 ] Lucas Brutschy commented on KAFKA-16275: You could start a new consumer after the upgrade, but that would be a different test case. I think it's fine to not port this test > Update kraft_upgrade_test.py to support KIP-848’s group protocol config > --- > > Key: KAFKA-16275 > URL: https://issues.apache.org/jira/browse/KAFKA-16275 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update the test method(s) in {{kraft_upgrade_test.py}} to > support the {{group.protocol}} configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16185) Fix client reconciliation of same assignment received in different epochs
[ https://issues.apache.org/jira/browse/KAFKA-16185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-16185: -- Assignee: Lucas Brutschy (was: Lianet Magrans) > Fix client reconciliation of same assignment received in different epochs > -- > > Key: KAFKA-16185 > URL: https://issues.apache.org/jira/browse/KAFKA-16185 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lucas Brutschy >Priority: Critical > Labels: client-transitions-issues, kip-848-client-support > Fix For: 3.8.0 > > > Currently, the intention in the client state machine is that the client > always reconciles whatever it has pending and sends an ack for it, but in > cases where the same assignment is received in different epochs this does not > work as expected. > 1 - Client might get stuck JOINING/RECONCILING, with a pending reconciliation > (delayed), and it receives the same assignment, but in a new epoch (ex. after > being FENCED). First time it receives the assignment it takes no action, as > it already has it as pending to reconcile, but when the reconciliation > completes it discards the result because the epoch changed. And this is > wrong. Note that after sending the assignment with the new epoch one time, > the broker continues to send null assignments. > Here is a sample sequence leading to the client stuck JOINING: > - client joins, epoch 0 > - client receives assignment tp1, stuck RECONCILING, epoch 1 > - member gets FENCED on the coord, coord bumps epoch to 2 > - client tries to rejoin (JOINING), epoch 0 provided by the client > - new member added to the group (group epoch bumped to 3), client receives > same assignment that is currently trying to reconcile (tp1), but with epoch 3 > - previous reconciliation completes, but will discard the result because it > will notice that the memberHasRejoined (memberEpochOnReconciliationStart != > memberEpoch). Client is stuck JOINING, with the server sending null target > assignment because it hasn't changed since the last one sent (tp1) > We should end up with a test similar to the existing > #testDelayedReconciliationResultDiscardedIfMemberRejoins but with the case > that the member receives the same assignment after being fenced and rejoining > 2 - Client is not sending ack back to the broker in cases where it finishes a > reconciliation for the same assignment that it sent in the last HB (builder > will not include the assignment). Following sequence: > - client owns T1-1 (last HB sent included ack for T1-1) > - client receives [T1-1, T2-1] and start reconciling > - client receives T1-1 (meaning T2-1 needs to be revoked) > - ongoing reconciliation for [T1-1, T2-1] fails so ack never sent for it > - next reconciliation starts for T1-1 and completes, but ack is not sent > because the builder sees it's the same it sent on the last HB, leaving the > broker waiting for an ack that won't arrive. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16261) MembershipManagerImpl.updateSubscription fails if already empty subscription
[ https://issues.apache.org/jira/browse/KAFKA-16261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-16261: -- Assignee: Lucas Brutschy (was: Lianet Magrans) > MembershipManagerImpl.updateSubscription fails if already empty subscription > > > Key: KAFKA-16261 > URL: https://issues.apache.org/jira/browse/KAFKA-16261 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Andrew Schofield >Assignee: Lucas Brutschy >Priority: Critical > Labels: client-transitions-issues, consumer-threading-refactor, > kip-848-client-support > Fix For: 3.8.0 > > > The internal SubscriptionState object keeps track of whether the assignment > is user-assigned, or auto-assigned. If there are no assigned partitions, the > assignment resets to NONE. If you call SubscriptionState.assignFromSubscribed > in this state it fails. > The easiest thing is perhaps to check > SubscriptionState.hasAutoAssignedPartitions() to make sure that > assignFromSubscribed is going to be permitted. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16008) Fix PlaintextConsumerTest.testMaxPollIntervalMs
[ https://issues.apache.org/jira/browse/KAFKA-16008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17820663#comment-17820663 ] Lucas Brutschy commented on KAFKA-16008: Fixed by https://issues.apache.org/jira/browse/KAFKA-16258 > Fix PlaintextConsumerTest.testMaxPollIntervalMs > --- > > Key: KAFKA-16008 > URL: https://issues.apache.org/jira/browse/KAFKA-16008 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Lucas Brutschy >Priority: Critical > Labels: consumer-threading-refactor, integration-tests, timeout > Fix For: 3.8.0 > > > The integration test {{PlaintextConsumerTest.testMaxPollIntervalMs}} is > failing when using the {{AsyncKafkaConsumer}}. > The error is: > {code} > org.opentest4j.AssertionFailedError: Timed out before expected rebalance > completed > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317) > at > kafka.api.PlaintextConsumerTest.testMaxPollIntervalMs(PlaintextConsumerTest.scala:194) > {code} > The logs include this line: > > {code} > [2023-12-13 15:11:16,134] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > {code} > I don't know if that's related or not. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16008) Fix PlaintextConsumerTest.testMaxPollIntervalMs
[ https://issues.apache.org/jira/browse/KAFKA-16008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy resolved KAFKA-16008. Resolution: Duplicate > Fix PlaintextConsumerTest.testMaxPollIntervalMs > --- > > Key: KAFKA-16008 > URL: https://issues.apache.org/jira/browse/KAFKA-16008 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Lucas Brutschy >Priority: Critical > Labels: consumer-threading-refactor, integration-tests, timeout > Fix For: 3.8.0 > > > The integration test {{PlaintextConsumerTest.testMaxPollIntervalMs}} is > failing when using the {{AsyncKafkaConsumer}}. > The error is: > {code} > org.opentest4j.AssertionFailedError: Timed out before expected rebalance > completed > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317) > at > kafka.api.PlaintextConsumerTest.testMaxPollIntervalMs(PlaintextConsumerTest.scala:194) > {code} > The logs include this line: > > {code} > [2023-12-13 15:11:16,134] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > {code} > I don't know if that's related or not. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16169) FencedException in commitAsync not propagated without callback
[ https://issues.apache.org/jira/browse/KAFKA-16169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-16169: -- Assignee: Lucas Brutschy (was: Lianet Magrans) > FencedException in commitAsync not propagated without callback > -- > > Key: KAFKA-16169 > URL: https://issues.apache.org/jira/browse/KAFKA-16169 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Critical > Labels: kip-848-client-support > Fix For: 3.8.0 > > > The javadocs for {{commitAsync()}} (w/o callback) say: > @throws org.apache.kafka.common.errors.FencedInstanceIdException if this > consumer instance gets fenced by broker. > > If no callback is passed into {{{}commitAsync(){}}}, no offset commit > callback invocation is submitted. However, we only check for a > {{FencedInstanceIdException}} when we execute a callback. It seems to me that > with {{commitAsync()}} we would not throw at all when the consumer gets > fenced. > In any case, we need a unit test that verifies that the > {{FencedInstanceIdException}} is thrown for each version of > {{{}commitAsync(){}}}. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16111) Implement tests for tricky rebalance callback scenarios
[ https://issues.apache.org/jira/browse/KAFKA-16111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17819997#comment-17819997 ] Lucas Brutschy commented on KAFKA-16111: https://github.com/apache/kafka/pull/15408 Example test > Implement tests for tricky rebalance callback scenarios > --- > > Key: KAFKA-16111 > URL: https://issues.apache.org/jira/browse/KAFKA-16111 > Project: Kafka > Issue Type: Test > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Lucas Brutschy >Priority: Major > Labels: callback, consumer-threading-refactor, integration-tests > Fix For: 3.8.0 > > > There is justified concern that the new threading model may not play well > with "tricky" {{ConsumerRebalanceListener}} callbacks. We need to provide > some assurance that it will support complicated patterns. > # Design and implement test scenarios > # Update and document any design changes with the callback sub-system where > needed > # Provide fix(es) to the {{AsyncKafkaConsumer}} implementation to abide by > said design -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16010) Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling
[ https://issues.apache.org/jira/browse/KAFKA-16010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy resolved KAFKA-16010. Resolution: Duplicate > Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling > -- > > Key: KAFKA-16010 > URL: https://issues.apache.org/jira/browse/KAFKA-16010 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Lucas Brutschy >Priority: Critical > Labels: consumer-threading-refactor, integration-tests, timeout > Fix For: 3.8.0 > > > The integration test > {{PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling}} is > failing when using the {{AsyncKafkaConsumer}}. > The error is: > {code} > org.opentest4j.AssertionFailedError: Did not get valid assignment for > partitions [topic1-2, topic1-4, topic-1, topic-0, topic1-5, topic1-1, > topic1-0, topic1-3] after one consumer left > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > kafka.api.AbstractConsumerTest.validateGroupAssignment(AbstractConsumerTest.scala:286) > at > kafka.api.PlaintextConsumerTest.runMultiConsumerSessionTimeoutTest(PlaintextConsumerTest.scala:1883) > at > kafka.api.PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling(PlaintextConsumerTest.scala:1281) > {code} > The logs include these lines: > > {code} > [2023-12-13 15:26:40,736] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > [2023-12-13 15:26:40,736] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > [2023-12-13 15:26:40,736] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > {code} > I don't know if that's related or not. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16010) Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling
[ https://issues.apache.org/jira/browse/KAFKA-16010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17819289#comment-17819289 ] Lucas Brutschy commented on KAFKA-16010: [~kirktrue] I think you enabled this test already > Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling > -- > > Key: KAFKA-16010 > URL: https://issues.apache.org/jira/browse/KAFKA-16010 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Lucas Brutschy >Priority: Critical > Labels: consumer-threading-refactor, integration-tests, timeout > Fix For: 3.8.0 > > > The integration test > {{PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling}} is > failing when using the {{AsyncKafkaConsumer}}. > The error is: > {code} > org.opentest4j.AssertionFailedError: Did not get valid assignment for > partitions [topic1-2, topic1-4, topic-1, topic-0, topic1-5, topic1-1, > topic1-0, topic1-3] after one consumer left > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > kafka.api.AbstractConsumerTest.validateGroupAssignment(AbstractConsumerTest.scala:286) > at > kafka.api.PlaintextConsumerTest.runMultiConsumerSessionTimeoutTest(PlaintextConsumerTest.scala:1883) > at > kafka.api.PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling(PlaintextConsumerTest.scala:1281) > {code} > The logs include these lines: > > {code} > [2023-12-13 15:26:40,736] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > [2023-12-13 15:26:40,736] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > [2023-12-13 15:26:40,736] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > {code} > I don't know if that's related or not. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16010) Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling
[ https://issues.apache.org/jira/browse/KAFKA-16010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17819289#comment-17819289 ] Lucas Brutschy edited comment on KAFKA-16010 at 2/21/24 3:14 PM: - [~kirktrue] I think you enabled this test already. Closing as duplicate was (Author: JIRAUSER302322): [~kirktrue] I think you enabled this test already > Fix PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling > -- > > Key: KAFKA-16010 > URL: https://issues.apache.org/jira/browse/KAFKA-16010 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Lucas Brutschy >Priority: Critical > Labels: consumer-threading-refactor, integration-tests, timeout > Fix For: 3.8.0 > > > The integration test > {{PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling}} is > failing when using the {{AsyncKafkaConsumer}}. > The error is: > {code} > org.opentest4j.AssertionFailedError: Did not get valid assignment for > partitions [topic1-2, topic1-4, topic-1, topic-0, topic1-5, topic1-1, > topic1-0, topic1-3] after one consumer left > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > kafka.api.AbstractConsumerTest.validateGroupAssignment(AbstractConsumerTest.scala:286) > at > kafka.api.PlaintextConsumerTest.runMultiConsumerSessionTimeoutTest(PlaintextConsumerTest.scala:1883) > at > kafka.api.PlaintextConsumerTest.testMultiConsumerSessionTimeoutOnStopPolling(PlaintextConsumerTest.scala:1281) > {code} > The logs include these lines: > > {code} > [2023-12-13 15:26:40,736] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > [2023-12-13 15:26:40,736] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > [2023-12-13 15:26:40,736] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > {code} > I don't know if that's related or not. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16290) Investigate propagating subscription state updates via queues
[ https://issues.apache.org/jira/browse/KAFKA-16290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17819144#comment-17819144 ] Lucas Brutschy edited comment on KAFKA-16290 at 2/21/24 9:16 AM: - Subscription state is updated directly which may cause an illegal state exception. See https://issues.apache.org/jira/browse/KAFKA-16227 was (Author: JIRAUSER302322): Subscription state is updated directly which may cause an illegal state exception in https://issues.apache.org/jira/browse/KAFKA-16227 > Investigate propagating subscription state updates via queues > - > > Key: KAFKA-16290 > URL: https://issues.apache.org/jira/browse/KAFKA-16290 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lucas Brutschy >Priority: Major > Labels: kip-848 > > We are mostly using the queues for interaction between application thread and > background thread, but the subscription object is shared between the threads, > and it is updated directly without going through the queues. > The way we allow updates to the subscription state from both threads is > definitely not right, and will bring trouble. Places like the assign() is > probably the most obvious, where we send an event to the background to > commit, but then update the subscription in the foreground right away. > It seems sensible to aim for having all updates to the subscription state in > the background, triggered from the app thread via events (and I think we > already have related events for all updates, just that the subscription state > was left out in the app thread). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16290) Investigate propagating subscription state updates via queues
[ https://issues.apache.org/jira/browse/KAFKA-16290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17819143#comment-17819143 ] Lucas Brutschy edited comment on KAFKA-16290 at 2/21/24 9:15 AM: - Subscription state is updated directly, which may cause records to be fetched immediately, but group metadata is propagated to the application thread only via queues, so we end up in a race condition where group metadata is not available but we already fetched records from our group. See https://issues.apache.org/jira/browse/KAFKA-16194 was (Author: JIRAUSER302322): Subscription state is updated directly, which may cause records to be fetched immediately, but group metadata is propagated to the application thread only via queues, so we end up in a race condition where group metadata is not available but we already fetched records from our group. > Investigate propagating subscription state updates via queues > - > > Key: KAFKA-16290 > URL: https://issues.apache.org/jira/browse/KAFKA-16290 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lucas Brutschy >Priority: Major > Labels: kip-848 > > We are mostly using the queues for interaction between application thread and > background thread, but the subscription object is shared between the threads, > and it is updated directly without going through the queues. > The way we allow updates to the subscription state from both threads is > definitely not right, and will bring trouble. Places like the assign() is > probably the most obvious, where we send an event to the background to > commit, but then update the subscription in the foreground right away. > It seems sensible to aim for having all updates to the subscription state in > the background, triggered from the app thread via events (and I think we > already have related events for all updates, just that the subscription state > was left out in the app thread). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16290) Investigate propagating subscription state updates via queues
[ https://issues.apache.org/jira/browse/KAFKA-16290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17819144#comment-17819144 ] Lucas Brutschy commented on KAFKA-16290: Subscription state is updated directly which may cause an illegal state exception in https://issues.apache.org/jira/browse/KAFKA-16227 > Investigate propagating subscription state updates via queues > - > > Key: KAFKA-16290 > URL: https://issues.apache.org/jira/browse/KAFKA-16290 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lucas Brutschy >Priority: Major > Labels: kip-848 > > We are mostly using the queues for interaction between application thread and > background thread, but the subscription object is shared between the threads, > and it is updated directly without going through the queues. > The way we allow updates to the subscription state from both threads is > definitely not right, and will bring trouble. Places like the assign() is > probably the most obvious, where we send an event to the background to > commit, but then update the subscription in the foreground right away. > It seems sensible to aim for having all updates to the subscription state in > the background, triggered from the app thread via events (and I think we > already have related events for all updates, just that the subscription state > was left out in the app thread). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16290) Investigate propagating subscription state updates via queues
[ https://issues.apache.org/jira/browse/KAFKA-16290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17819143#comment-17819143 ] Lucas Brutschy commented on KAFKA-16290: Subscription state is updated directly, which may cause records to be fetched immediately, but group metadata is propagated to the application thread only via queues, so we end up in a race condition where group metadata is not available but we already fetched records from our group. > Investigate propagating subscription state updates via queues > - > > Key: KAFKA-16290 > URL: https://issues.apache.org/jira/browse/KAFKA-16290 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lucas Brutschy >Priority: Major > Labels: kip-848 > > We are mostly using the queues for interaction between application thread and > background thread, but the subscription object is shared between the threads, > and it is updated directly without going through the queues. > The way we allow updates to the subscription state from both threads is > definitely not right, and will bring trouble. Places like the assign() is > probably the most obvious, where we send an event to the background to > commit, but then update the subscription in the foreground right away. > It seems sensible to aim for having all updates to the subscription state in > the background, triggered from the app thread via events (and I think we > already have related events for all updates, just that the subscription state > was left out in the app thread). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16290) Investigate propagating subscription state updates via queues
Lucas Brutschy created KAFKA-16290: -- Summary: Investigate propagating subscription state updates via queues Key: KAFKA-16290 URL: https://issues.apache.org/jira/browse/KAFKA-16290 Project: Kafka Issue Type: Task Components: clients, consumer Reporter: Lucas Brutschy We are mostly using the queues for interaction between application thread and background thread, but the subscription object is shared between the threads, and it is updated directly without going through the queues. The way we allow updates to the subscription state from both threads is definitely not right, and will bring trouble. Places like the assign() is probably the most obvious, where we send an event to the background to commit, but then update the subscription in the foreground right away. It seems sensible to aim for having all updates to the subscription state in the background, triggered from the app thread via events (and I think we already have related events for all updates, just that the subscription state was left out in the app thread). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16284) Performance regression in RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-16284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy updated KAFKA-16284: --- Fix Version/s: (was: 3.8.0) > Performance regression in RocksDB > - > > Key: KAFKA-16284 > URL: https://issues.apache.org/jira/browse/KAFKA-16284 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Major > > In benchmarks, we are noticing a performance regression in the performance of > `RocksDBStore`. > The regression happens between those two commits: > > {code:java} > trunk - 70c8b8d0af - regressed - 2024-01-06T14:00:20Z > trunk - d5aa341a18 - not regressed - 2023-12-31T11:47:14Z > {code} > The regression can be reproduced by the following test: > > {code:java} > package org.apache.kafka.streams.state.internals; > import org.apache.kafka.common.serialization.Serdes; > import org.apache.kafka.common.utils.Bytes; > import org.apache.kafka.streams.StreamsConfig; > import org.apache.kafka.streams.processor.StateStoreContext; > import org.apache.kafka.test.InternalMockProcessorContext; > import org.apache.kafka.test.MockRocksDbConfigSetter; > import org.apache.kafka.test.StreamsTestUtils; > import org.apache.kafka.test.TestUtils; > import org.junit.Before; > import org.junit.Test; > import java.io.File; > import java.nio.ByteBuffer; > import java.util.Properties; > public class RocksDBStorePerfTest { > InternalMockProcessorContext context; > RocksDBStore rocksDBStore; > final static String DB_NAME = "db-name"; > final static String METRICS_SCOPE = "metrics-scope"; > RocksDBStore getRocksDBStore() { > return new RocksDBStore(DB_NAME, METRICS_SCOPE); > } > @Before > public void setUp() { > final Properties props = StreamsTestUtils.getStreamsConfig(); > props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, > MockRocksDbConfigSetter.class); > File dir = TestUtils.tempDirectory(); > context = new InternalMockProcessorContext<>( > dir, > Serdes.String(), > Serdes.String(), > new StreamsConfig(props) > ); > } > @Test > public void testPerf() { > long start = System.currentTimeMillis(); > for (int i = 0; i < 10; i++) { > System.out.println("Iteration: "+i+" Time: " + > (System.currentTimeMillis() - start)); > RocksDBStore rocksDBStore = getRocksDBStore(); > rocksDBStore.init((StateStoreContext) context, rocksDBStore); > for (int j = 0; j < 100; j++) { > rocksDBStore.put(new > Bytes(ByteBuffer.allocate(4).putInt(j).array()), "perf".getBytes()); > } > rocksDBStore.close(); > } > long end = System.currentTimeMillis(); > System.out.println("Time: " + (end - start)); > } > } > {code} > > I have isolated the regression to commit > [5bc3aa4|https://github.com/apache/kafka/commit/5bc3aa428067dff1f2b9075ff5d1351fb05d4b10]. > On my machine, the test takes ~8 seconds before > [5bc3aa4|https://github.com/apache/kafka/commit/5bc3aa428067dff1f2b9075ff5d1351fb05d4b10] > and ~30 seconds after > [5bc3aa4|https://github.com/apache/kafka/commit/5bc3aa428067dff1f2b9075ff5d1351fb05d4b10]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16284) Performance regression in RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-16284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy updated KAFKA-16284: --- Fix Version/s: 3.8.0 > Performance regression in RocksDB > - > > Key: KAFKA-16284 > URL: https://issues.apache.org/jira/browse/KAFKA-16284 > Project: Kafka > Issue Type: Task > Components: streams >Affects Versions: 3.8.0 >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Major > Fix For: 3.8.0 > > > In benchmarks, we are noticing a performance regression in the performance of > `RocksDBStore`. > The regression happens between those two commits: > > {code:java} > trunk - 70c8b8d0af - regressed - 2024-01-06T14:00:20Z > trunk - d5aa341a18 - not regressed - 2023-12-31T11:47:14Z > {code} > The regression can be reproduced by the following test: > > {code:java} > package org.apache.kafka.streams.state.internals; > import org.apache.kafka.common.serialization.Serdes; > import org.apache.kafka.common.utils.Bytes; > import org.apache.kafka.streams.StreamsConfig; > import org.apache.kafka.streams.processor.StateStoreContext; > import org.apache.kafka.test.InternalMockProcessorContext; > import org.apache.kafka.test.MockRocksDbConfigSetter; > import org.apache.kafka.test.StreamsTestUtils; > import org.apache.kafka.test.TestUtils; > import org.junit.Before; > import org.junit.Test; > import java.io.File; > import java.nio.ByteBuffer; > import java.util.Properties; > public class RocksDBStorePerfTest { > InternalMockProcessorContext context; > RocksDBStore rocksDBStore; > final static String DB_NAME = "db-name"; > final static String METRICS_SCOPE = "metrics-scope"; > RocksDBStore getRocksDBStore() { > return new RocksDBStore(DB_NAME, METRICS_SCOPE); > } > @Before > public void setUp() { > final Properties props = StreamsTestUtils.getStreamsConfig(); > props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, > MockRocksDbConfigSetter.class); > File dir = TestUtils.tempDirectory(); > context = new InternalMockProcessorContext<>( > dir, > Serdes.String(), > Serdes.String(), > new StreamsConfig(props) > ); > } > @Test > public void testPerf() { > long start = System.currentTimeMillis(); > for (int i = 0; i < 10; i++) { > System.out.println("Iteration: "+i+" Time: " + > (System.currentTimeMillis() - start)); > RocksDBStore rocksDBStore = getRocksDBStore(); > rocksDBStore.init((StateStoreContext) context, rocksDBStore); > for (int j = 0; j < 100; j++) { > rocksDBStore.put(new > Bytes(ByteBuffer.allocate(4).putInt(j).array()), "perf".getBytes()); > } > rocksDBStore.close(); > } > long end = System.currentTimeMillis(); > System.out.println("Time: " + (end - start)); > } > } > {code} > > I have isolated the regression to commit > [5bc3aa4|https://github.com/apache/kafka/commit/5bc3aa428067dff1f2b9075ff5d1351fb05d4b10]. > On my machine, the test takes ~8 seconds before > [5bc3aa4|https://github.com/apache/kafka/commit/5bc3aa428067dff1f2b9075ff5d1351fb05d4b10] > and ~30 seconds after > [5bc3aa4|https://github.com/apache/kafka/commit/5bc3aa428067dff1f2b9075ff5d1351fb05d4b10]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16284) Performance regression in RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-16284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy updated KAFKA-16284: --- Affects Version/s: (was: 3.8.0) > Performance regression in RocksDB > - > > Key: KAFKA-16284 > URL: https://issues.apache.org/jira/browse/KAFKA-16284 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Major > Fix For: 3.8.0 > > > In benchmarks, we are noticing a performance regression in the performance of > `RocksDBStore`. > The regression happens between those two commits: > > {code:java} > trunk - 70c8b8d0af - regressed - 2024-01-06T14:00:20Z > trunk - d5aa341a18 - not regressed - 2023-12-31T11:47:14Z > {code} > The regression can be reproduced by the following test: > > {code:java} > package org.apache.kafka.streams.state.internals; > import org.apache.kafka.common.serialization.Serdes; > import org.apache.kafka.common.utils.Bytes; > import org.apache.kafka.streams.StreamsConfig; > import org.apache.kafka.streams.processor.StateStoreContext; > import org.apache.kafka.test.InternalMockProcessorContext; > import org.apache.kafka.test.MockRocksDbConfigSetter; > import org.apache.kafka.test.StreamsTestUtils; > import org.apache.kafka.test.TestUtils; > import org.junit.Before; > import org.junit.Test; > import java.io.File; > import java.nio.ByteBuffer; > import java.util.Properties; > public class RocksDBStorePerfTest { > InternalMockProcessorContext context; > RocksDBStore rocksDBStore; > final static String DB_NAME = "db-name"; > final static String METRICS_SCOPE = "metrics-scope"; > RocksDBStore getRocksDBStore() { > return new RocksDBStore(DB_NAME, METRICS_SCOPE); > } > @Before > public void setUp() { > final Properties props = StreamsTestUtils.getStreamsConfig(); > props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, > MockRocksDbConfigSetter.class); > File dir = TestUtils.tempDirectory(); > context = new InternalMockProcessorContext<>( > dir, > Serdes.String(), > Serdes.String(), > new StreamsConfig(props) > ); > } > @Test > public void testPerf() { > long start = System.currentTimeMillis(); > for (int i = 0; i < 10; i++) { > System.out.println("Iteration: "+i+" Time: " + > (System.currentTimeMillis() - start)); > RocksDBStore rocksDBStore = getRocksDBStore(); > rocksDBStore.init((StateStoreContext) context, rocksDBStore); > for (int j = 0; j < 100; j++) { > rocksDBStore.put(new > Bytes(ByteBuffer.allocate(4).putInt(j).array()), "perf".getBytes()); > } > rocksDBStore.close(); > } > long end = System.currentTimeMillis(); > System.out.println("Time: " + (end - start)); > } > } > {code} > > I have isolated the regression to commit > [5bc3aa4|https://github.com/apache/kafka/commit/5bc3aa428067dff1f2b9075ff5d1351fb05d4b10]. > On my machine, the test takes ~8 seconds before > [5bc3aa4|https://github.com/apache/kafka/commit/5bc3aa428067dff1f2b9075ff5d1351fb05d4b10] > and ~30 seconds after > [5bc3aa4|https://github.com/apache/kafka/commit/5bc3aa428067dff1f2b9075ff5d1351fb05d4b10]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16284) Performance regression in RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-16284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-16284: -- Assignee: Lucas Brutschy > Performance regression in RocksDB > - > > Key: KAFKA-16284 > URL: https://issues.apache.org/jira/browse/KAFKA-16284 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Major > > In benchmarks, we are noticing a performance regression in the performance of > `RocksDBStore`. > The regression happens between those two commits: > > {code:java} > trunk - 70c8b8d0af - regressed - 2024-01-06T14:00:20Z > trunk - d5aa341a18 - not regressed - 2023-12-31T11:47:14Z > {code} > The regression can be reproduced by the following test: > > {code:java} > package org.apache.kafka.streams.state.internals; > import org.apache.kafka.common.serialization.Serdes; > import org.apache.kafka.common.utils.Bytes; > import org.apache.kafka.streams.StreamsConfig; > import org.apache.kafka.streams.processor.StateStoreContext; > import org.apache.kafka.test.InternalMockProcessorContext; > import org.apache.kafka.test.MockRocksDbConfigSetter; > import org.apache.kafka.test.StreamsTestUtils; > import org.apache.kafka.test.TestUtils; > import org.junit.Before; > import org.junit.Test; > import java.io.File; > import java.nio.ByteBuffer; > import java.util.Properties; > public class RocksDBStorePerfTest { > InternalMockProcessorContext context; > RocksDBStore rocksDBStore; > final static String DB_NAME = "db-name"; > final static String METRICS_SCOPE = "metrics-scope"; > RocksDBStore getRocksDBStore() { > return new RocksDBStore(DB_NAME, METRICS_SCOPE); > } > @Before > public void setUp() { > final Properties props = StreamsTestUtils.getStreamsConfig(); > props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, > MockRocksDbConfigSetter.class); > File dir = TestUtils.tempDirectory(); > context = new InternalMockProcessorContext<>( > dir, > Serdes.String(), > Serdes.String(), > new StreamsConfig(props) > ); > } > @Test > public void testPerf() { > long start = System.currentTimeMillis(); > for (int i = 0; i < 10; i++) { > System.out.println("Iteration: "+i+" Time: " + > (System.currentTimeMillis() - start)); > RocksDBStore rocksDBStore = getRocksDBStore(); > rocksDBStore.init((StateStoreContext) context, rocksDBStore); > for (int j = 0; j < 100; j++) { > rocksDBStore.put(new > Bytes(ByteBuffer.allocate(4).putInt(j).array()), "perf".getBytes()); > } > rocksDBStore.close(); > } > long end = System.currentTimeMillis(); > System.out.println("Time: " + (end - start)); > } > } > {code} > > I have isolated the regression to commit > [5bc3aa4|https://github.com/apache/kafka/commit/5bc3aa428067dff1f2b9075ff5d1351fb05d4b10]. > On my machine, the test takes ~8 seconds before > [5bc3aa4|https://github.com/apache/kafka/commit/5bc3aa428067dff1f2b9075ff5d1351fb05d4b10] > and ~30 seconds after > [5bc3aa4|https://github.com/apache/kafka/commit/5bc3aa428067dff1f2b9075ff5d1351fb05d4b10]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16284) Performance regression in RocksDB
Lucas Brutschy created KAFKA-16284: -- Summary: Performance regression in RocksDB Key: KAFKA-16284 URL: https://issues.apache.org/jira/browse/KAFKA-16284 Project: Kafka Issue Type: Task Components: streams Reporter: Lucas Brutschy In benchmarks, we are noticing a performance regression in the performance of `RocksDBStore`. The regression happens between those two commits: {code:java} trunk - 70c8b8d0af - regressed - 2024-01-06T14:00:20Z trunk - d5aa341a18 - not regressed - 2023-12-31T11:47:14Z {code} The regression can be reproduced by the following test: {code:java} package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.test.InternalMockProcessorContext; import org.apache.kafka.test.MockRocksDbConfigSetter; import org.apache.kafka.test.StreamsTestUtils; import org.apache.kafka.test.TestUtils; import org.junit.Before; import org.junit.Test; import java.io.File; import java.nio.ByteBuffer; import java.util.Properties; public class RocksDBStorePerfTest { InternalMockProcessorContext context; RocksDBStore rocksDBStore; final static String DB_NAME = "db-name"; final static String METRICS_SCOPE = "metrics-scope"; RocksDBStore getRocksDBStore() { return new RocksDBStore(DB_NAME, METRICS_SCOPE); } @Before public void setUp() { final Properties props = StreamsTestUtils.getStreamsConfig(); props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class); File dir = TestUtils.tempDirectory(); context = new InternalMockProcessorContext<>( dir, Serdes.String(), Serdes.String(), new StreamsConfig(props) ); } @Test public void testPerf() { long start = System.currentTimeMillis(); for (int i = 0; i < 10; i++) { System.out.println("Iteration: "+i+" Time: " + (System.currentTimeMillis() - start)); RocksDBStore rocksDBStore = getRocksDBStore(); rocksDBStore.init((StateStoreContext) context, rocksDBStore); for (int j = 0; j < 100; j++) { rocksDBStore.put(new Bytes(ByteBuffer.allocate(4).putInt(j).array()), "perf".getBytes()); } rocksDBStore.close(); } long end = System.currentTimeMillis(); System.out.println("Time: " + (end - start)); } } {code} I have isolated the regression to commit [5bc3aa4|https://github.com/apache/kafka/commit/5bc3aa428067dff1f2b9075ff5d1351fb05d4b10]. On my machine, the test takes ~8 seconds before [5bc3aa4|https://github.com/apache/kafka/commit/5bc3aa428067dff1f2b9075ff5d1351fb05d4b10] and ~30 seconds after [5bc3aa4|https://github.com/apache/kafka/commit/5bc3aa428067dff1f2b9075ff5d1351fb05d4b10]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16243) Idle kafka-console-consumer with new consumer group protocol preemptively leaves group
[ https://issues.apache.org/jira/browse/KAFKA-16243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy updated KAFKA-16243: --- Fix Version/s: 3.8.0 > Idle kafka-console-consumer with new consumer group protocol preemptively > leaves group > -- > > Key: KAFKA-16243 > URL: https://issues.apache.org/jira/browse/KAFKA-16243 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.7.0 >Reporter: Andrew Schofield >Assignee: Lucas Brutschy >Priority: Critical > Fix For: 3.8.0 > > > Using the new consumer group protocol with kafka-console-consumer.sh, I find > that if I leave the consumer with no records to process for 5 minutes > (max.poll.interval.ms = 30ms), the tool logs the following warning > message and leaves the group. > "consumer poll timeout has expired. This means the time between subsequent > calls to poll() was longer than the configured max.poll.interval.ms, which > typically implies that the poll loop is spending too much time processing > messages. You can address this either by increasing max.poll.interval.ms or > by reducing the maximum size of batches returned in poll() with > max.poll.records." > With the older consumer, this did not occur. > The reason is that the consumer keeps a poll timer which is used to ensure > liveness of the application thread. The poll timer automatically updates > while the `Consumer.poll(Duration)` method is blocked, while the newer > consumer only updates the poll timer when a new call to > `Consumer.poll(Duration)` is issued. This means that the > kafka-console-consumer.sh tools, which uses a very long timeout by default, > works differently with the new consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-16167) Fix PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup
[ https://issues.apache.org/jira/browse/KAFKA-16167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reopened KAFKA-16167: > Fix PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup > -- > > Key: KAFKA-16167 > URL: https://issues.apache.org/jira/browse/KAFKA-16167 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor, integration-tests, > kip-848-client-support > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16167) Fix PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup
[ https://issues.apache.org/jira/browse/KAFKA-16167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818487#comment-17818487 ] Lucas Brutschy edited comment on KAFKA-16167 at 2/19/24 2:01 PM: - Looks like the test is still flaky. IN an unrelated PR I got this: [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15219/10/tests] ``` java.lang.NullPointerException: Cannot invoke "org.apache.kafka.clients.consumer.OffsetAndMetadata.offset()" because the return value of "java.util.Map.get(Object)" is null at kafka.api.PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup(PlaintextConsumerTest.scala:316) ``` was (Author: JIRAUSER302322): Looks like the test is still flaky. IN an unrelated PR I got this: https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15219/10/tests > Fix PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup > -- > > Key: KAFKA-16167 > URL: https://issues.apache.org/jira/browse/KAFKA-16167 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor, integration-tests, > kip-848-client-support > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16167) Fix PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup
[ https://issues.apache.org/jira/browse/KAFKA-16167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818487#comment-17818487 ] Lucas Brutschy commented on KAFKA-16167: Looks like the test is still flaky. IN an unrelated PR I got this: https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15219/10/tests > Fix PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup > -- > > Key: KAFKA-16167 > URL: https://issues.apache.org/jira/browse/KAFKA-16167 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor, integration-tests, > kip-848-client-support > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16167) Fix PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup
[ https://issues.apache.org/jira/browse/KAFKA-16167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy updated KAFKA-16167: --- Reviewer: Lucas Brutschy (was: Lucas Brutschy) > Fix PlaintextConsumerTest.testAutoCommitOnCloseAfterWakeup > -- > > Key: KAFKA-16167 > URL: https://issues.apache.org/jira/browse/KAFKA-16167 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor, integration-tests, > kip-848-client-support > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16104) Enable additional PlaintextConsumerTest tests for new consumer
[ https://issues.apache.org/jira/browse/KAFKA-16104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy updated KAFKA-16104: --- Reviewer: Lucas Brutschy (was: Lucas Brutschy) > Enable additional PlaintextConsumerTest tests for new consumer > -- > > Key: KAFKA-16104 > URL: https://issues.apache.org/jira/browse/KAFKA-16104 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Andrew Schofield >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor, integration-tests > Fix For: 3.8.0 > > > It should be possible to enable: > * testAutoCommitOnClose > * testAutoCommitOnCloseAfterWakeup > * testExpandingTopicSubscriptions > * testShrinkingTopicSubscriptions > * testMultiConsumerSessionTimeoutOnClose (KAFKA-16011 has been fixed) > * testMultiConsumerSessionTimeoutOnStopPolling > * testAutoCommitOnRebalance > * testPerPartitionLeadMetricsCleanUpWithSubscribe > * testPerPartitionLagMetricsCleanUpWithSubscribe > * testStaticConsumerDetectsNewPartitionCreatedAfterRestart -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16230) Update verifiable_consumer.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy updated KAFKA-16230: --- Reviewer: Lucas Brutschy (was: Lucas Brutschy) > Update verifiable_consumer.py to support KIP-848’s group protocol config > > > Key: KAFKA-16230 > URL: https://issues.apache.org/jira/browse/KAFKA-16230 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update {{verifiable_consumer.py}} to support the > {{group.protocol}} configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument. It will default to > classic and we will take a separate task (Jira) to update the callers. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15833) Restrict Consumer API to be used from one thread
[ https://issues.apache.org/jira/browse/KAFKA-15833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-15833: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Restrict Consumer API to be used from one thread > > > Key: KAFKA-15833 > URL: https://issues.apache.org/jira/browse/KAFKA-15833 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support, > kip-848-preview > Fix For: 3.7.0 > > > The legacy consumer restricts the API to be used from one thread only. This > is not enforced in the new consumer. To avoid inconsistencies in the > behavior, we should enforce the same restriction in the new consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16198) Reconciliation may lose partitions when topic metadata is delayed
[ https://issues.apache.org/jira/browse/KAFKA-16198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-16198: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Reconciliation may lose partitions when topic metadata is delayed > - > > Key: KAFKA-16198 > URL: https://issues.apache.org/jira/browse/KAFKA-16198 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Critical > Labels: client-transitions-issues, kip-848-client-support > Fix For: 3.8.0 > > > The current reconciliation code in `AsyncKafkaConsumer`s `MembershipManager` > may lose part of the server-provided assignment when metadata is delayed. The > reason is incorrect handling of partially resolved topic names, as in this > example: > * We get assigned {{T1-1}} and {{T2-1}} > * We reconcile {{{}T1-1{}}}, {{T2-1}} remains in {{assignmentUnresolved}} > since the topic id {{T2}} is not known yet > * We get new cluster metadata, which includes {{{}T2{}}}, so {{T2-1}} is > moved to {{assignmentReadyToReconcile}} > * We call {{reconcile}} -- {{T2-1}} is now treated as the full assignment, > so {{T1-1}} is being revoked > * We end up with assignment {{T2-1, which is inconsistent with the > broker-side target assignment.}} > > Generally, this seems to be a problem around semantics of the internal > collections `assignmentUnresolved` and `assignmentReadyToReconcile`. Absence > of a topic in `assignmentReadyToReconcile` may mean either revocation of the > topic partition(s), or unavailability of a topic name for the topic. > Internal state with simpler and correct invariants could be achieved by using > a single collection `currentTargetAssignment` which is based on topic IDs and > always corresponds to the latest assignment received from the broker. During > every attempted reconciliation, all topic IDs will be resolved from the local > cache, which should not introduce a lot of overhead. `assignmentUnresolved` > and `assignmentReadyToReconcile` are removed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15913) Migrate AsyncKafkaConsumerTest away from ConsumerTestBuilder
[ https://issues.apache.org/jira/browse/KAFKA-15913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-15913: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Migrate AsyncKafkaConsumerTest away from ConsumerTestBuilder > > > Key: KAFKA-15913 > URL: https://issues.apache.org/jira/browse/KAFKA-15913 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Lucas Brutschy >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.7.0 > > > ConsumerTestBuilder is meant to be an unit testing utility; however, we seem > to use Mockito#spy quite liberally. This is not the right testing strategy > because we basically turn unit testing into integration testing. > > While the current unit tests run fine, we should probably make the mocking > using Mockito#mock by default and test each dependency independently. > > The ask here is > # Make mock(class) by default > # Provide more flexible interface for the testBuilder to allow user to > configure spy or mock. Or, let user pass in their own mock. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14624) State restoration is broken with standby tasks and cache-enabled stores in processor API
[ https://issues.apache.org/jira/browse/KAFKA-14624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-14624: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > State restoration is broken with standby tasks and cache-enabled stores in > processor API > > > Key: KAFKA-14624 > URL: https://issues.apache.org/jira/browse/KAFKA-14624 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.1 >Reporter: Balaji Rao >Assignee: Lucas Brutschy >Priority: Major > > I found that cache-enabled state stores in PAPI with standby tasks sometimes > returns stale data when a partition moves from one app instance to another > and back. [Here's|https://github.com/balajirrao/kafka-streams-multi-runner] a > small project that I used to reproduce the issue. > I dug around a bit and it seems like it's a bug in standby task state > restoration when caching is enabled. If a partition moves from instance 1 to > 2 and then back to instance 1, since the `CachingKeyValueStore` doesn't > register a restore callback, it can return potentially stale data for > non-dirty keys. > I could fix the issue by modifying the `CachingKeyValueStore` to register a > restore callback in which the cache restored keys are added to the cache. Is > this fix in the right direction? > {code:java} > // register the store > context.register( > root, > (RecordBatchingStateRestoreCallback) records -> { > for (final ConsumerRecord record : > records) { > put(Bytes.wrap(record.key()), record.value()); > } > } > ); > {code} > > I would like to contribute a fix, if I can get some help! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15941) Flaky test: shouldRestoreNullRecord() – org.apache.kafka.streams.integration.RestoreIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-15941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-15941: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Flaky test: shouldRestoreNullRecord() – > org.apache.kafka.streams.integration.RestoreIntegrationTest > --- > > Key: KAFKA-15941 > URL: https://issues.apache.org/jira/browse/KAFKA-15941 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Apoorv Mittal >Assignee: Lucas Brutschy >Priority: Major > Labels: flaky-test > > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14699/24/tests/ > {code:java} > org.opentest4j.AssertionFailedError: Condition not met within timeout 6. > Did not receive all [KeyValue(2, \x00\x00\x00)] records from topic output > (got []) ==> expected: but was: > Stacktraceorg.opentest4j.AssertionFailedError: Condition not met > within timeout 6. Did not receive all [KeyValue(2, \x00\x00\x00)] records > from topic output (got []) ==> expected: but was: at > org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at > org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210) at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:331) >at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:379) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:328) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:312) at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:302) at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:878) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:827) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:790) > at > org.apache.kafka.streams.integration.RestoreIntegrationTest.shouldRestoreNullRecord(RestoreIntegrationTest.java:244) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-12679) Rebalancing a restoring or running task may cause directory livelocking with newly created task
[ https://issues.apache.org/jira/browse/KAFKA-12679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-12679: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Rebalancing a restoring or running task may cause directory livelocking with > newly created task > --- > > Key: KAFKA-12679 > URL: https://issues.apache.org/jira/browse/KAFKA-12679 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.1 > Environment: Broker and client version 2.6.1 > Multi-node broker cluster > Multi-node, auto scaling streams app instances >Reporter: Peter Nahas >Assignee: Lucas Brutschy >Priority: Major > Fix For: 3.7.0 > > Attachments: Backoff-between-directory-lock-attempts.patch > > > If a task that uses a state store is in the restoring state or in a running > state and the task gets rebalanced to a separate thread on the same instance, > the newly created task will attempt to lock the state store director while > the first thread is continuing to use it. This is totally normal and expected > behavior when the first thread is not yet aware of the rebalance. However, > that newly created task is effectively running a while loop with no backoff > waiting to lock the directory: > # TaskManager tells the task to restore in `tryToCompleteRestoration` > # The task attempts to lock the directory > # The lock attempt fails and throws a > `org.apache.kafka.streams.errors.LockException` > # TaskManager catches the exception, stops further processing on the task > and reports that not all tasks have restored > # The StreamThread `runLoop` continues to run. > I've seen some documentation indicate that there is supposed to be a backoff > when this condition occurs, but there does not appear to be any in the code. > The result is that if this goes on for long enough, the lock-loop may > dominate CPU usage in the process and starve out the old stream thread task > processing. > > When in this state, the DEBUG level logging for TaskManager will produce a > steady stream of messages like the following: > {noformat} > 2021-03-30 20:59:51,098 DEBUG --- [StreamThread-10] o.a.k.s.p.i.TaskManager > : stream-thread [StreamThread-10] Could not initialize 0_34 due > to the following exception; will retry > org.apache.kafka.streams.errors.LockException: stream-thread > [StreamThread-10] standby-task [0_34] Failed to lock the state directory for > task 0_34 > {noformat} > > > I've attached a git formatted patch to resolve the issue. Simply detect the > scenario and sleep for the backoff time in the appropriate StreamThread. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14343) Write upgrade/downgrade tests for enabling the state updater
[ https://issues.apache.org/jira/browse/KAFKA-14343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-14343: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Write upgrade/downgrade tests for enabling the state updater > - > > Key: KAFKA-14343 > URL: https://issues.apache.org/jira/browse/KAFKA-14343 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Major > > Write a test that verifies the upgrade from a version of Streams with state > updater disabled to a version with state updater enabled and vice versa, so > that we can offer a save upgrade path. > * upgrade test from a version of Streams with state updater disabled to a > version with state updater enabled (probably a system test since the old code > path will be removed from the code base) > * downgrade test from a version of Streams with state updater enabled to a > version with state updater disabled (probably a system test since the old > code path will be removed from the code base) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14014) Flaky test NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()
[ https://issues.apache.org/jira/browse/KAFKA-14014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-14014: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Flaky test > NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets() > > > Key: KAFKA-14014 > URL: https://issues.apache.org/jira/browse/KAFKA-14014 > Project: Kafka > Issue Type: Test > Components: streams >Reporter: Bruno Cadonna >Assignee: Lucas Brutschy >Priority: Critical > Labels: flaky-test > > {code:java} > java.lang.AssertionError: > Expected: <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 2)]> > but: was <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 1)]> > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) > at > org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets(NamedTopologyIntegrationTest.java:540) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:568) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at java.base/java.lang.Thread.run(Thread.java:833) > {code} > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_11_and_Scala_2_13___shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets/ > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_17_and_Scala_2_13___shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15957) ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy broken
[ https://issues.apache.org/jira/browse/KAFKA-15957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-15957: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy > broken > --- > > Key: KAFKA-15957 > URL: https://issues.apache.org/jira/browse/KAFKA-15957 > Project: Kafka > Issue Type: Bug >Reporter: Justine Olshan >Assignee: Lucas Brutschy >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15977) DelegationTokenEndToEndAuthorizationWithOwnerTest leaks threads
[ https://issues.apache.org/jira/browse/KAFKA-15977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-15977: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > DelegationTokenEndToEndAuthorizationWithOwnerTest leaks threads > --- > > Key: KAFKA-15977 > URL: https://issues.apache.org/jira/browse/KAFKA-15977 > Project: Kafka > Issue Type: Bug >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Major > > [https://ci-builds.apache.org/blue/rest/organizations/jenkins/pipelines/Kafka/pipelines/kafka-pr/branches/PR-14878/runs/8/nodes/11/steps/90/log/?start=0] > > I had an unrelated PR fail with the following thread leak: > > {code:java} > Gradle Test Run :core:test > Gradle Test Executor 95 > > DelegationTokenEndToEndAuthorizationWithOwnerTest > executionError STARTED > kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.executionError > failed, log available in > /home/jenkins/workspace/Kafka_kafka-pr_PR-14878/core/build/reports/testOutput/kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.executionError.test.stdoutGradle > Test Run :core:test > Gradle Test Executor 95 > > DelegationTokenEndToEndAuthorizationWithOwnerTest > executionError FAILED > org.opentest4j.AssertionFailedError: Found 1 unexpected threads during > @AfterAll: `kafka-admin-client-thread | adminclient-483` ==> expected: > but was: {code} > > All the following tests on that error fail with initialization errors, > because the admin client thread is never closed. > > This is preceded by the following test failure: > > {code:java} > Gradle Test Run :core:test > Gradle Test Executor 95 > > DelegationTokenEndToEndAuthorizationWithOwnerTest > > testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl(String, boolean) > [1] > quorum=kraft, isIdempotenceEnabled=true FAILED > org.opentest4j.AssertionFailedError: expected acls: > (principal=User:scram-user2, host=*, operation=CREATE_TOKENS, > permissionType=ALLOW) > (principal=User:scram-user2, host=*, operation=DESCRIBE_TOKENS, > permissionType=ALLOW) > but got: > > at > app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at app//org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at app//kafka.utils.TestUtils$.waitAndVerifyAcls(TestUtils.scala:1142) > at > app//kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.$anonfun$configureSecurityAfterServersStart$1(DelegationTokenEndToEndAuthorizationWithOwnerTest.scala:71) > at > app//kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.$anonfun$configureSecurityAfterServersStart$1$adapted(DelegationTokenEndToEndAuthorizationWithOwnerTest.scala:70) > at > app//scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576) > at > app//scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574) > at app//scala.collection.AbstractIterable.foreach(Iterable.scala:933) > at > app//kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.configureSecurityAfterServersStart(DelegationTokenEndToEndAuthorizationWithOwnerTest.scala:70){code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-13531) Flaky test NamedTopologyIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-13531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-13531: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Flaky test NamedTopologyIntegrationTest > --- > > Key: KAFKA-13531 > URL: https://issues.apache.org/jira/browse/KAFKA-13531 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Matthias J. Sax >Assignee: Lucas Brutschy >Priority: Critical > Labels: flaky-test > Attachments: > org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing().test.stdout > > > org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets > {quote}java.lang.AssertionError: Did not receive all 3 records from topic > output-stream-2 within 6 ms, currently accumulated data is [] Expected: > is a value equal to or greater than <3> but: <0> was less than <3> at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:648) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:368) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:336) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:644) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:617) > at > org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets(NamedTopologyIntegrationTest.java:439){quote} > STDERR > {quote}java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.GroupSubscribedToTopicException: Deleting > offsets of a topic is forbidden while the consumer group is actively > subscribed to it. at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) > at > org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper.lambda$removeNamedTopology$3(KafkaStreamsNamedTopologyWrapper.java:213) > at > org.apache.kafka.common.internals.KafkaFutureImpl.lambda$whenComplete$2(KafkaFutureImpl.java:107) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) > at > org.apache.kafka.common.internals.KafkaCompletableFuture.kafkaComplete(KafkaCompletableFuture.java:39) > at > org.apache.kafka.common.internals.KafkaFutureImpl.complete(KafkaFutureImpl.java:122) > at > org.apache.kafka.streams.processor.internals.TopologyMetadata.maybeNotifyTopologyVersionWaiters(TopologyMetadata.java:154) > at > org.apache.kafka.streams.processor.internals.StreamThread.checkForTopologyUpdates(StreamThread.java:916) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:598) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:575) > Caused by: org.apache.kafka.common.errors.GroupSubscribedToTopicException: > Deleting offsets of a topic is forbidden while the consumer group is actively > subscribed to it. java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.GroupSubscribedToTopicException: Deleting > offsets of a topic is forbidden while the consumer group is actively > subscribed to it. at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) > at > org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper.lambda$removeNamedTopology$3(KafkaStreamsNamedTopologyWrapper.java:213) > at > org.apache.kafka.common.internals.KafkaFutureImpl.lambda$whenComplete$2(KafkaFutureImpl.java:107) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > jav
[jira] [Assigned] (KAFKA-7000) KafkaConsumer.position should wait for assignment metadata
[ https://issues.apache.org/jira/browse/KAFKA-7000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-7000: - Assignee: Lucas Brutschy (was: Lucas Brutschy) > KafkaConsumer.position should wait for assignment metadata > -- > > Key: KAFKA-7000 > URL: https://issues.apache.org/jira/browse/KAFKA-7000 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: John Roesler >Assignee: Lucas Brutschy >Priority: Blocker > Fix For: 2.0.0 > > > While updating Kafka Streams to stop using the deprecated > Consumer.poll(long), I found that this code unexpectedly throws an exception: > {code:java} > consumer.subscribe(topics); > // consumer.poll(0); <- I've removed this line, which shouldn't be necessary > here. > final Set partitions = new HashSet<>(); > for (final String topic : topics) { > for (final PartitionInfo partition : consumer.partitionsFor(topic)) { > partitions.add(new TopicPartition(partition.topic(), > partition.partition())); > } > } > for (final TopicPartition tp : partitions) { > final long offset = consumer.position(tp); > committedOffsets.put(tp, offset); > }{code} > Here is the exception: > {code:java} > Exception in thread "main" java.lang.IllegalStateException: You can only > check the position for partitions assigned to this consumer. >at > org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1620) >at > org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1586) >at > org.apache.kafka.streams.tests.EosTestDriver.getCommittedOffsets(EosTestDriver.java:275) >at > org.apache.kafka.streams.tests.EosTestDriver.verify(EosTestDriver.java:148) >at > org.apache.kafka.streams.tests.StreamsEosTest.main(StreamsEosTest.java:69){code} > > As you can see in the commented code in my snippet, we used to block for > assignment with a poll(0), which is now deprecated. > It seems reasonable to me for position() to do the same thing that poll() > does, which is call `coordinator.poll(timeout.toMillis())` early in > processing to ensure an up-to-date assignment. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14432) RocksDBStore relies on finalizers to not leak memory
[ https://issues.apache.org/jira/browse/KAFKA-14432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-14432: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > RocksDBStore relies on finalizers to not leak memory > > > Key: KAFKA-14432 > URL: https://issues.apache.org/jira/browse/KAFKA-14432 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Blocker > Fix For: 3.4.0 > > > Relying on finalizers in RocksDB has been deprecated for a long time, and > starting with rocksdb 7, finalizers are removed completely (see > [https://github.com/facebook/rocksdb/pull/9523]). > Kafka Streams currently relies on finalizers in parts to not leak memory. > This needs to be resolved before we can upgrade to RocksDB 7. > See [https://github.com/apache/kafka/pull/12809] . > This is a native heap profile after running Kafka Streams without finalizers > for a few hours: > {code:java} > Total: 13547.5 MB > 12936.3 95.5% 95.5% 12936.3 95.5% rocksdb::port::cacheline_aligned_alloc > 438.5 3.2% 98.7% 438.5 3.2% rocksdb::BlockFetcher::ReadBlockContents > 84.0 0.6% 99.3% 84.2 0.6% rocksdb::Arena::AllocateNewBlock > 45.9 0.3% 99.7% 45.9 0.3% prof_backtrace_impl > 8.1 0.1% 99.7% 14.6 0.1% rocksdb::BlockBasedTable::PutDataBlockToCache > 6.4 0.0% 99.8% 12941.4 95.5% Java_org_rocksdb_Statistics_newStatistics___3BJ > 6.1 0.0% 99.8% 6.9 0.1% rocksdb::LRUCacheShard::Insert@2d8b20 > 5.1 0.0% 99.9% 6.5 0.0% rocksdb::VersionSet::ProcessManifestWrites > 3.9 0.0% 99.9% 3.9 0.0% rocksdb::WritableFileWriter::WritableFileWriter > 3.2 0.0% 99.9% 3.2 0.0% std::string::_Rep::_S_create{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14530) Check state updater more than once in process loops
[ https://issues.apache.org/jira/browse/KAFKA-14530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-14530: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Check state updater more than once in process loops > --- > > Key: KAFKA-14530 > URL: https://issues.apache.org/jira/browse/KAFKA-14530 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Minor > Fix For: 3.5.0 > > > In the new state restoration code, the state updater needs to be checked > regularly by the main thread to transfer ownership of tasks back to the main > thread once the state of the task is restored. The more often we check this, > the faster we can start processing the tasks. > Currently, we only check the state updater once in every loop iteration of > the state updater. And while we couldn't observe this to be strictly not > often enough, we can increase the number of checks easily by moving the check > inside the inner processing loop. This would mean that once we have iterated > over `numIterations` records, we can already start processing tasks that have > finished restoration in the meantime. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15832) Trigger client reconciliation based on manager poll
[ https://issues.apache.org/jira/browse/KAFKA-15832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-15832: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Trigger client reconciliation based on manager poll > --- > > Key: KAFKA-15832 > URL: https://issues.apache.org/jira/browse/KAFKA-15832 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lucas Brutschy >Priority: Critical > Labels: kip-848-client-support, reconciliation > Fix For: 3.8.0 > > > Currently the reconciliation logic on the client is triggered when a new > target assignment is received and resolved, or when new unresolved target > assignments are discovered in metadata. > This could be improved by triggering the reconciliation logic on each poll > iteration, to reconcile whatever is ready to be reconciled. This would > require changes to support poll on the MembershipManager, and integrate it > with the current polling logic in the background thread. Receiving a new > target assignment from the broker, or resolving new topic names via a > metadata update could only ensure that the #assignmentReadyToReconcile is > properly updated (currently done), but wouldn't trigger the #reconcile() > logic, leaving that to the #poll() operation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14415) ThreadCache is getting slower with every additional state store
[ https://issues.apache.org/jira/browse/KAFKA-14415?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-14415: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > ThreadCache is getting slower with every additional state store > --- > > Key: KAFKA-14415 > URL: https://issues.apache.org/jira/browse/KAFKA-14415 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Major > Fix For: 3.4.0 > > > There are a few lines in `ThreadCache` that I think should be optimized. > `sizeBytes` is called at least once, and potentially many times in every > `put` and is linear in the number of caches (= number of state stores, so > typically proportional to number of tasks). That means, with every additional > task, every put gets a little slower.Compare the throughput of TIME_ROCKS on > trunk (green graph): > [http://kstreams-benchmark-results.s3-website-us-west-2.amazonaws.com/experiments/stateheavy-3-5-3-4-0-51b7eb7937-jenkins-20221113214104-streamsbench/] > This is the throughput of TIME_ROCKS is 20% higher when a constant time > `sizeBytes` implementation is used: > [http://kstreams-benchmark-results.s3-website-us-west-2.amazonaws.com/experiments/stateheavy-3-5-LUCASCOMPARE-lucas-20221122140846-streamsbench/] > The same seems to apply for the MEM backend (initial throughput >8000 instead > of 6000), however, I cannot run the same benchmark here because the memory is > filled too quickly. > [http://kstreams-benchmark-results.s3-website-us-west-2.amazonaws.com/experiments/stateheavy-3-5-LUCASSTATE-lucas-20221121231632-streamsbench/] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15280) Implement client support for selecting KIP-848 server-side assignor
[ https://issues.apache.org/jira/browse/KAFKA-15280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-15280: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Implement client support for selecting KIP-848 server-side assignor > --- > > Key: KAFKA-15280 > URL: https://issues.apache.org/jira/browse/KAFKA-15280 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Lucas Brutschy >Priority: Major > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > Fix For: 3.7.0 > > > This includes: > * Validate the client’s configuration for server-side assignor selection > defined in config `group.remote.assignor` > * Include the assignor taken from config in the {{ConsumerGroupHeartbeat}} > request, in the `ServerAssignor` field > * Properly handle UNSUPPORTED_ASSIGNOR errors that may be returned in the HB > response if the server does not support the assignor defined by the consumer. > This task is part of the work to implement support for the new KIP-848 > consumer group protocol. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15690) EosIntegrationTest is flaky.
[ https://issues.apache.org/jira/browse/KAFKA-15690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-15690: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > EosIntegrationTest is flaky. > > > Key: KAFKA-15690 > URL: https://issues.apache.org/jira/browse/KAFKA-15690 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Calvin Liu >Assignee: Lucas Brutschy >Priority: Major > Labels: flaky-test > > EosIntegrationTest > shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances[exactly_once_v2, > processing threads = false] > {code:java} > org.junit.runners.model.TestTimedOutException: test timed out after 600 > seconds at > org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleServerDisconnect(NetworkClient.java:) > at > org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:821) > at > org.apache.kafka.clients.NetworkClient.processTimeoutDisconnection(NetworkClient.java:779) >at > org.apache.kafka.clients.NetworkClient.handleTimedOutRequests(NetworkClient.java:837) > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=0_1, processor=KSTREAM-SOURCE-00, > topic=multiPartitionInputTopic, partition=1, offset=15, > stacktrace=java.lang.RuntimeException: Detected we've been interrupted. > at > org.apache.kafka.streams.integration.EosIntegrationTest$1$1.transform(EosIntegrationTest.java:892) >at > org.apache.kafka.streams.integration.EosIntegrationTest$1$1.transform(EosIntegrationTest.java:867) >at > org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:49) > at > org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:38) > at > org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:66) > {code} > shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once_v2, processing > threads = false] > {code:java} > java.lang.RuntimeException: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.UnknownServerException: The server experienced > an unexpected error when processing the request. at > org.apache.kafka.streams.integration.utils.KafkaEmbedded.deleteTopic(KafkaEmbedded.java:204) > at > org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicsAndWait(EmbeddedKafkaCluster.java:286) >at > org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicsAndWait(EmbeddedKafkaCluster.java:274) >at > org.apache.kafka.streams.integration.EosIntegrationTest.createTopics(EosIntegrationTest.java:174) > at > java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=0_1, processor=KSTREAM-SOURCE-00, > topic=multiPartitionInputTopic, partition=1, offset=15, > stacktrace=java.lang.RuntimeException: Detected we've been interrupted. > at > org.apache.kafka.streams.integration.EosIntegrationTest$1$1.transform(EosIntegrationTest.java:892) >at > org.apache.kafka.streams.integration.EosIntegrationTest$1$1.transform(EosIntegrationTest.java:867) >at > org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:49) > at > org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:38) > at > org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:66) > {code} > shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once, processing > threads = false] > {code:java} > org.opentest4j.AssertionFailedError: Condition not met within timeout 6. > StreamsTasks did not request commit. ==> expected: but was: >at > app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) >at > app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) >at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) > at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210) > java.lang.IllegalStateException: Replica > [Topic=__transaction_state,Partition=2,Replica=1] should be in the > OfflineReplica,ReplicaDeletionStarted states before moving to > ReplicaDeletionIneligible state. Instead it is in OnlineReplica state
[jira] [Assigned] (KAFKA-14532) Correctly handle failed fetch when partitions unassigned
[ https://issues.apache.org/jira/browse/KAFKA-14532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-14532: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Correctly handle failed fetch when partitions unassigned > > > Key: KAFKA-14532 > URL: https://issues.apache.org/jira/browse/KAFKA-14532 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Blocker > Fix For: 3.4.0, 3.3.2 > > > On master, all our long-running test jobs are running into this exception: > {code:java} > java.lang.IllegalStateException: No current assignment for partition > stream-soak-test-KSTREAM-OUTERTHIS-86-store-changelog-1 2 at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:370) > 3 at > org.apache.kafka.clients.consumer.internals.SubscriptionState.clearPreferredReadReplica(SubscriptionState.java:623) > 4 at java.util.LinkedHashMap$LinkedKeySet.forEach(LinkedHashMap.java:559) 5 > at > org.apache.kafka.clients.consumer.internals.Fetcher$1.onFailure(Fetcher.java:349) > 6 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:179) > 7 at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:149) > 8 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:613) > 9 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:427) > 10 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) > 11 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:251) > 12 at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1307) > 13 at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243) > 14 at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) > 15 at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:450) > 16 at > org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:910) > 17 at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:773) > 18 at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:613) > 19 at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:575) > 20[2022-12-13 04:01:59,024] ERROR [i-016cf5d2c1889c316-StreamThread-1] > stream-client [i-016cf5d2c1889c316] Encountered the following exception > during processing and sent shutdown request for the entire application. > (org.apache.kafka.streams.KafkaStreams) > 21org.apache.kafka.streams.errors.StreamsException: > java.lang.IllegalStateException: No current assignment for partition > stream-soak-test-KSTREAM-OUTERTHIS-86-store-changelog-1 22 at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:653) > 23 at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:575) > 24Caused by: java.lang.IllegalStateException: No current assignment for > partition stream-soak-test-KSTREAM-OUTERTHIS-86-store-changelog-1 25 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:370) > 26 at > org.apache.kafka.clients.consumer.internals.SubscriptionState.clearPreferredReadReplica(SubscriptionState.java:623) > 27 at java.util.LinkedHashMap$LinkedKeySet.forEach(LinkedHashMap.java:559) > 28 at > org.apache.kafka.clients.consumer.internals.Fetcher$1.onFailure(Fetcher.java:349) > 29 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:179) > 30 at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:149) > 31 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:613) > 32 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:427) > 33 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) > 34 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:251) > 35 at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1307) > 36 at > org.apache.kafka.clients.co
[jira] [Assigned] (KAFKA-14309) Kafka Streams upgrade tests do not cover for FK-joins
[ https://issues.apache.org/jira/browse/KAFKA-14309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-14309: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Kafka Streams upgrade tests do not cover for FK-joins > - > > Key: KAFKA-14309 > URL: https://issues.apache.org/jira/browse/KAFKA-14309 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Major > > The current streams upgrade system test for FK joins inserts the production > of foreign key data and an actual foreign key join in every version of > SmokeTestDriver except for the latest. The effect was that FK join upgrades > are not tested at all, since no FK join code is executed after the bounce in > the system test. > We should enable the FK-join code in the system test. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15803) Update last seen epoch during commit
[ https://issues.apache.org/jira/browse/KAFKA-15803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-15803: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Update last seen epoch during commit > > > Key: KAFKA-15803 > URL: https://issues.apache.org/jira/browse/KAFKA-15803 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Lucas Brutschy >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support, > kip-848-e2e, kip-848-preview > Fix For: 3.7.0 > > > At the time we implemented commitAsync in the prototypeAsyncConsumer, > metadata was not there. The ask here is to investigate if we need to add the > following function to the commit code: > > private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, > OffsetAndMetadata offsetAndMetadata) { > if (offsetAndMetadata != null) > offsetAndMetadata.leaderEpoch().ifPresent(epoch -> > metadata.updateLastSeenEpochIfNewer(topicPartition, epoch)); > } -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15326) Decouple Processing Thread from Polling Thread
[ https://issues.apache.org/jira/browse/KAFKA-15326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-15326: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Decouple Processing Thread from Polling Thread > -- > > Key: KAFKA-15326 > URL: https://issues.apache.org/jira/browse/KAFKA-15326 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Critical > > As part of an ongoing effort to implement a better threading architecture in > Kafka streams, we decouple N stream threads into N polling threads and N > processing threads. The effort to consolidate N polling thread into a single > thread is follow-up after this ticket. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15942) Implement ConsumerInterceptor
[ https://issues.apache.org/jira/browse/KAFKA-15942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-15942: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Implement ConsumerInterceptor > - > > Key: KAFKA-15942 > URL: https://issues.apache.org/jira/browse/KAFKA-15942 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Reporter: Philip Nee >Assignee: Lucas Brutschy >Priority: Blocker > Labels: consumer-threading-refactor, interceptors > Fix For: 3.8.0 > > > As title, we need to implement ConsumerInterceptor in the AsyncKafkaConsumer > > This is the current code. The implementation would be very similar > {code:java} > if (interceptors != null) > interceptors.onCommit(offsets); {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16097) State updater removes task without pending action in EOSv2
[ https://issues.apache.org/jira/browse/KAFKA-16097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-16097: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > State updater removes task without pending action in EOSv2 > -- > > Key: KAFKA-16097 > URL: https://issues.apache.org/jira/browse/KAFKA-16097 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.8.0 >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Major > > A long-running soak encountered the following exception: > > {code:java} > [2024-01-08 03:06:00,586] ERROR [i-081c089d2ed054443-StreamThread-3] Thread > encountered an error processing soak test > (org.apache.kafka.streams.StreamsSoakTest) > java.lang.IllegalStateException: Got a removed task 1_0 from the state > updater that is not for recycle, closing, or updating input partitions; this > should not happen > at > org.apache.kafka.streams.processor.internals.TaskManager.handleRemovedTasksFromStateUpdater(TaskManager.java:939) > at > org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:788) > at > org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1141) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:949) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645) > [2024-01-08 03:06:00,587] ERROR [i-081c089d2ed054443-StreamThread-3] > stream-client [i-081c089d2ed054443] Encountered the following exception > during processing and sent shutdown request for the entire application. > (org.apache.kafka.streams.KafkaStreams) > org.apache.kafka.streams.errors.StreamsException: > java.lang.IllegalStateException: Got a removed task 1_0 from the state > updater that is not for recycle, closing, or updating input partitions; this > should not happen > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:729) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645) > Caused by: java.lang.IllegalStateException: Got a removed task 1_0 from the > state updater that is not for recycle, closing, or updating input partitions; > this should not happen > at > org.apache.kafka.streams.processor.internals.TaskManager.handleRemovedTasksFromStateUpdater(TaskManager.java:939) > at > org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:788) > at > org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1141) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:949) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686) > ... 1 more{code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15798) Flaky Test NamedTopologyIntegrationTest.shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology()
[ https://issues.apache.org/jira/browse/KAFKA-15798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-15798: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Flaky Test > NamedTopologyIntegrationTest.shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology() > - > > Key: KAFKA-15798 > URL: https://issues.apache.org/jira/browse/KAFKA-15798 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Justine Olshan >Assignee: Lucas Brutschy >Priority: Major > Labels: flaky-test > > I saw a few examples recently. 2 have the same error, but the third is > different > [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14629/22/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_8_and_Scala_2_12___shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology___2/] > [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2365/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_21_and_Scala_2_13___shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology__/] > > The failure is like > {code:java} > java.lang.AssertionError: Did not receive all 5 records from topic > output-stream-1 within 6 ms, currently accumulated data is [] Expected: > is a value equal to or greater than <5> but: <0> was less than <5>{code} > The other failure was > [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2365/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_8_and_Scala_2_12___shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology__/] > {code:java} > java.lang.AssertionError: Expected: <[0, 1]> but: was <[0]>{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16220) KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions is flaky
[ https://issues.apache.org/jira/browse/KAFKA-16220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-16220: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions > is flaky > > > Key: KAFKA-16220 > URL: https://issues.apache.org/jira/browse/KAFKA-16220 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Major > Labels: flaky, flaky-test > > This test has seen significant flakyness > > https://ge.apache.org/s/fac7lploprvuu/tests/task/:streams:test/details/org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest/shouldThrowIllegalArgumentExceptionWhenCustomPartitionerReturnsMultiplePartitions()?top-execution=1 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15865) Ensure consumer.poll() execute autocommit callback
[ https://issues.apache.org/jira/browse/KAFKA-15865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-15865: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Ensure consumer.poll() execute autocommit callback > -- > > Key: KAFKA-15865 > URL: https://issues.apache.org/jira/browse/KAFKA-15865 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Lucas Brutschy >Priority: Major > Labels: consumer-threading-refactor, kip-848-preview > Fix For: 3.7.0 > > > When the network thread completes autocommits, we need to send a > message/event to the application to notify the thread to execute the > callback. In KAFKA-15327, the network thread sends a > AutoCommitCompletionBackgroundEvent to the polling thread. The polling > thread should trigger the OffsetCommitCallback upon receiving it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14278) Convert INVALID_PRODUCER_EPOCH into PRODUCER_FENCED TxnOffsetCommit
[ https://issues.apache.org/jira/browse/KAFKA-14278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-14278: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Convert INVALID_PRODUCER_EPOCH into PRODUCER_FENCED TxnOffsetCommit > --- > > Key: KAFKA-14278 > URL: https://issues.apache.org/jira/browse/KAFKA-14278 > Project: Kafka > Issue Type: Sub-task > Components: producer , streams >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Major > Fix For: 3.6.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15319) Upgrade rocksdb to fix CVE-2022-37434
[ https://issues.apache.org/jira/browse/KAFKA-15319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-15319: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Upgrade rocksdb to fix CVE-2022-37434 > - > > Key: KAFKA-15319 > URL: https://issues.apache.org/jira/browse/KAFKA-15319 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.4.1 >Reporter: Maruthi >Assignee: Lucas Brutschy >Priority: Critical > Fix For: 3.6.0, 3.5.2 > > Attachments: compat_report.html.zip > > > Rocksdbjni<7.9.2 is vulnerable to CVE-2022-37434 due to zlib 1.2.12 > Upgrade to 1.2.13 to fix > https://github.com/facebook/rocksdb/commit/0993c9225f8086bab6c4c0a2d7206897d1cc688c -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14299) Benchmark and stabilize state updater
[ https://issues.apache.org/jira/browse/KAFKA-14299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-14299: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Benchmark and stabilize state updater > - > > Key: KAFKA-14299 > URL: https://issues.apache.org/jira/browse/KAFKA-14299 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Major > > We need to benchmark and stabilize the separate state restoration code path. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-12935) Flaky Test RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore
[ https://issues.apache.org/jira/browse/KAFKA-12935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-12935: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Flaky Test > RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore > > > Key: KAFKA-12935 > URL: https://issues.apache.org/jira/browse/KAFKA-12935 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Matthias J. Sax >Assignee: Lucas Brutschy >Priority: Critical > Labels: flaky-test > Fix For: 3.4.0 > > Attachments: > RestoreIntegrationTest#shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore[true].rtf > > > {quote}java.lang.AssertionError: Expected: <0L> but: was <5005L> at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at > org.apache.kafka.streams.integration.RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(RestoreIntegrationTest.java:374) > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-9545) Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`
[ https://issues.apache.org/jira/browse/KAFKA-9545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-9545: - Assignee: Lucas Brutschy (was: Lucas Brutschy) > Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted` > -- > > Key: KAFKA-9545 > URL: https://issues.apache.org/jira/browse/KAFKA-9545 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Jason Gustafson >Assignee: Lucas Brutschy >Priority: Major > > https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/4678/testReport/org.apache.kafka.streams.integration/RegexSourceIntegrationTest/testRegexMatchesTopicsAWhenDeleted/ > {code} > java.lang.AssertionError: Condition not met within timeout 15000. Stream > tasks not updated > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:367) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:415) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:383) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:366) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:337) > at > org.apache.kafka.streams.integration.RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted(RegexSourceIntegrationTest.java:224) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16077) Streams fails to close task after restoration when input partitions are updated
[ https://issues.apache.org/jira/browse/KAFKA-16077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-16077: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Streams fails to close task after restoration when input partitions are > updated > --- > > Key: KAFKA-16077 > URL: https://issues.apache.org/jira/browse/KAFKA-16077 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Blocker > Labels: streams > > There is a race condition in the state updater that can cause the following: > # We have an active task in the state updater > # We get fenced. We recreate the producer, transactions now uninitialized. > We ask the state updater to give back the task, add a pending action to close > the task clean once it’s handed back > # We get a new assignment with updated input partitions. The task is still > owned by the state updater, so we ask the state updater again to hand it back > and add a pending action to update its input partition > # The task is handed back by the state updater. We update its input > partitions but forget to close it clean (pending action was overwritten) > # Now the task is in an initialized state, but the underlying producer does > not have transactions initialized > This can lead to an exception like this: > {code:java} > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-org.apache.kafka.streams.errors.StreamsException: > Exception caught in process. taskId=1_0, > processor=KSTREAM-SOURCE-05, topic=node-name-repartition, > partition=0, offset=618798, stacktrace=java.lang.IllegalStateException: > TransactionalId stream-soak-test-d647640a-12e5-4e74-a0af-e105d0d0cb67-2: > Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:999) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:985) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:311) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:660) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.StreamsProducer.maybeBeginTransaction(StreamsProducer.java:240) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:258) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:253) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:175) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:270) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.doJoin(KStreamKTableJoinProcessor.java:130) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:99) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291) > streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at > org.apache.k
[jira] [Assigned] (KAFKA-16155) Investigate testAutoCommitIntercept
[ https://issues.apache.org/jira/browse/KAFKA-16155?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-16155: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Investigate testAutoCommitIntercept > --- > > Key: KAFKA-16155 > URL: https://issues.apache.org/jira/browse/KAFKA-16155 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Critical > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > Even with KAFKA-15942, the test PlaintextConsumerTest.testAutoCommitIntercept > flakes on the the initial setup (before using interceptors, so interceptors > are unrelated here, except for being used later in the test). > The problem is that we are seeking two topic partitions to offset 10 and 20, > respectively, but when we commit, we seem to have lost one of the offsets, > likely due to a race condition. > When I output `subscriptionState.allConsumed` repeatedly, I get this output: > {noformat} > allConsumed: {topic-0=OffsetAndMetadata{offset=100, leaderEpoch=0, > metadata=''}, topic-1=OffsetAndMetadata{offset=0, leaderEpoch=null, > metadata=''}} > seeking topic-0 to FetchPosition{offset=10, offsetEpoch=Optional.empty, > currentLeader=LeaderAndEpoch{leader=Optional[localhost:58298 (id: 0 rack: > null)], epoch=0}} > seeking topic-1 to FetchPosition{offset=20, offsetEpoch=Optional.empty, > currentLeader=LeaderAndEpoch{leader=Optional[localhost:58301 (id: 1 rack: > null)], epoch=0}} > allConsumed: {topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, > metadata=''}, topic-1=OffsetAndMetadata{offset=20, leaderEpoch=null, > metadata=''}} > allConsumed: {topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, > metadata=''}} > allConsumed: {topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, > metadata=''}, topic-1=OffsetAndMetadata{offset=0, leaderEpoch=null, > metadata=''}} > autocommit start {topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, > metadata=''}, topic-1=OffsetAndMetadata{offset=0, leaderEpoch=null, > metadata=''}} > {noformat} > So we after we seek to 10 / 20, we lose one of the offsets, maybe because we > haven't reconciled the assignment yet. Later, we get the second topic > partition assigned, but the offset is initialized to 0. > We should investigate whether this can be made more like the behavior in the > original consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16243) Idle kafka-console-consumer with new consumer group protocol preemptively leaves group
[ https://issues.apache.org/jira/browse/KAFKA-16243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-16243: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Idle kafka-console-consumer with new consumer group protocol preemptively > leaves group > -- > > Key: KAFKA-16243 > URL: https://issues.apache.org/jira/browse/KAFKA-16243 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.7.0 >Reporter: Andrew Schofield >Assignee: Lucas Brutschy >Priority: Critical > > Using the new consumer group protocol with kafka-console-consumer.sh, I find > that if I leave the consumer with no records to process for 5 minutes > (max.poll.interval.ms = 30ms), the tool logs the following warning > message and leaves the group. > "consumer poll timeout has expired. This means the time between subsequent > calls to poll() was longer than the configured max.poll.interval.ms, which > typically implies that the poll loop is spending too much time processing > messages. You can address this either by increasing max.poll.interval.ms or > by reducing the maximum size of batches returned in poll() with > max.poll.records." > With the older consumer, this did not occur. > The reason is that the consumer keeps a poll timer which is used to ensure > liveness of the application thread. The poll timer automatically updates > while the `Consumer.poll(Duration)` method is blocked, while the newer > consumer only updates the poll timer when a new call to > `Consumer.poll(Duration)` is issued. This means that the > kafka-console-consumer.sh tools, which uses a very long timeout by default, > works differently with the new consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16155) Investigate testAutoCommitIntercept
[ https://issues.apache.org/jira/browse/KAFKA-16155?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy updated KAFKA-16155: --- Reporter: Lucas Brutschy (was: Lucas Brutschy) > Investigate testAutoCommitIntercept > --- > > Key: KAFKA-16155 > URL: https://issues.apache.org/jira/browse/KAFKA-16155 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Critical > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > Even with KAFKA-15942, the test PlaintextConsumerTest.testAutoCommitIntercept > flakes on the the initial setup (before using interceptors, so interceptors > are unrelated here, except for being used later in the test). > The problem is that we are seeking two topic partitions to offset 10 and 20, > respectively, but when we commit, we seem to have lost one of the offsets, > likely due to a race condition. > When I output `subscriptionState.allConsumed` repeatedly, I get this output: > {noformat} > allConsumed: {topic-0=OffsetAndMetadata{offset=100, leaderEpoch=0, > metadata=''}, topic-1=OffsetAndMetadata{offset=0, leaderEpoch=null, > metadata=''}} > seeking topic-0 to FetchPosition{offset=10, offsetEpoch=Optional.empty, > currentLeader=LeaderAndEpoch{leader=Optional[localhost:58298 (id: 0 rack: > null)], epoch=0}} > seeking topic-1 to FetchPosition{offset=20, offsetEpoch=Optional.empty, > currentLeader=LeaderAndEpoch{leader=Optional[localhost:58301 (id: 1 rack: > null)], epoch=0}} > allConsumed: {topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, > metadata=''}, topic-1=OffsetAndMetadata{offset=20, leaderEpoch=null, > metadata=''}} > allConsumed: {topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, > metadata=''}} > allConsumed: {topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, > metadata=''}, topic-1=OffsetAndMetadata{offset=0, leaderEpoch=null, > metadata=''}} > autocommit start {topic-0=OffsetAndMetadata{offset=10, leaderEpoch=null, > metadata=''}, topic-1=OffsetAndMetadata{offset=0, leaderEpoch=null, > metadata=''}} > {noformat} > So we after we seek to 10 / 20, we lose one of the offsets, maybe because we > haven't reconciled the assignment yet. Later, we get the second topic > partition assigned, but the offset is initialized to 0. > We should investigate whether this can be made more like the behavior in the > original consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16009) Fix PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation
[ https://issues.apache.org/jira/browse/KAFKA-16009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-16009: -- Assignee: Lucas Brutschy (was: Lucas Brutschy) > Fix PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation > > > Key: KAFKA-16009 > URL: https://issues.apache.org/jira/browse/KAFKA-16009 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Lucas Brutschy >Priority: Critical > Labels: consumer-threading-refactor, integration-tests, timeout > Fix For: 3.8.0 > > > The integration test > {{PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation}} is failing > when using the {{AsyncKafkaConsumer}}. > The error is: > {code} > org.opentest4j.AssertionFailedError: Timed out before expected rebalance > completed > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317) > at > kafka.api.PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation(PlaintextConsumerTest.scala:235) > {code} > The logs include this line: > > {code} > [2023-12-13 15:20:27,824] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > {code} > I don't know if that's related or not. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16009) Fix PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation
[ https://issues.apache.org/jira/browse/KAFKA-16009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-16009: -- Assignee: Lucas Brutschy (was: Kirk True) > Fix PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation > > > Key: KAFKA-16009 > URL: https://issues.apache.org/jira/browse/KAFKA-16009 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Lucas Brutschy >Priority: Critical > Labels: consumer-threading-refactor, integration-tests, timeout > Fix For: 3.8.0 > > > The integration test > {{PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation}} is failing > when using the {{AsyncKafkaConsumer}}. > The error is: > {code} > org.opentest4j.AssertionFailedError: Timed out before expected rebalance > completed > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317) > at > kafka.api.PlaintextConsumerTest.testMaxPollIntervalMsDelayInRevocation(PlaintextConsumerTest.scala:235) > {code} > The logs include this line: > > {code} > [2023-12-13 15:20:27,824] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > {code} > I don't know if that's related or not. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16248) Kafka consumer should cache leader offset ranges
[ https://issues.apache.org/jira/browse/KAFKA-16248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy updated KAFKA-16248: --- Priority: Critical (was: Major) > Kafka consumer should cache leader offset ranges > > > Key: KAFKA-16248 > URL: https://issues.apache.org/jira/browse/KAFKA-16248 > Project: Kafka > Issue Type: Bug >Reporter: Lucas Brutschy >Priority: Critical > > We noticed a streams application received an OFFSET_OUT_OF_RANGE error > following a network partition and streams task rebalance and subsequently > reset its offsets to the beginning. > Inspecting the logs, we saw multiple consumer log messages like: > {code:java} > Setting offset for partition tp to the committed offset > FetchPosition{offset=1234, offsetEpoch=Optional.empty...) > {code} > Inspecting the streams code, it looks like kafka streams calls `commitSync` > passing through an explicit OffsetAndMetadata object but does not populate > the offset leader epoch. > The offset leader epoch is required in the offset commit to ensure that all > consumers in the consumer group have coherent metadata before fetching. > Otherwise after a consumer group rebalance, a consumer may fetch with a stale > leader epoch with respect to the committed offset and get an offset out of > range error from a zombie partition leader. > An example of where this can cause issues: > 1. We have a consumer group with consumer 1 and consumer 2. Partition P is > assigned to consumer 1 which has up-to-date metadata for P. Consumer 2 has > stale metadata for P. > 2. Consumer 1 fetches partition P with offset 50, epoch 8. commits the offset > 50 without an epoch. > 3. The consumer group rebalances and P is now assigned to consumer 2. > Consumer 2 has a stale leader epoch for P (let's say leader epoch 7). > Consumer 2 will now try to fetch with leader epoch 7, offset 50. If we have a > zombie leader due to a network partition, the zombie leader may accept > consumer 2's fetch leader epoch and return an OFFSET_OUT_OF_RANGE to consumer > 2. > If in step 1, consumer 1 committed the leader epoch for the message, then > when consumer 2 receives assignment P it would force a metadata refresh to > discover a sufficiently new leader epoch for the committed offset. > Kafka Streams cannot fully determine the leader epoch of the offsets it wants > to commit - in EOS mode, streams commits the offset after the last control > records (to avoid always having a lag of >0), but the leader epoch of the > control record is not known to streams (since only non-control records are > returned from Consumer.poll). > A fix discussed with [~hachikuji] is to have the consumer cache leader epoch > ranges, similar to how the broker maintains a leader epoch cache. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16248) Kafka consumer should cache leader offset ranges
[ https://issues.apache.org/jira/browse/KAFKA-16248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy updated KAFKA-16248: --- Description: We noticed a streams application received an OFFSET_OUT_OF_RANGE error following a network partition and streams task rebalance and subsequently reset its offsets to the beginning. Inspecting the logs, we saw multiple consumer log messages like: {code:java} Setting offset for partition tp to the committed offset FetchPosition{offset=1234, offsetEpoch=Optional.empty...) {code} Inspecting the streams code, it looks like kafka streams calls `commitSync` passing through an explicit OffsetAndMetadata object but does not populate the offset leader epoch. The offset leader epoch is required in the offset commit to ensure that all consumers in the consumer group have coherent metadata before fetching. Otherwise after a consumer group rebalance, a consumer may fetch with a stale leader epoch with respect to the committed offset and get an offset out of range error from a zombie partition leader. An example of where this can cause issues: 1. We have a consumer group with consumer 1 and consumer 2. Partition P is assigned to consumer 1 which has up-to-date metadata for P. Consumer 2 has stale metadata for P. 2. Consumer 1 fetches partition P with offset 50, epoch 8. commits the offset 50 without an epoch. 3. The consumer group rebalances and P is now assigned to consumer 2. Consumer 2 has a stale leader epoch for P (let's say leader epoch 7). Consumer 2 will now try to fetch with leader epoch 7, offset 50. If we have a zombie leader due to a network partition, the zombie leader may accept consumer 2's fetch leader epoch and return an OFFSET_OUT_OF_RANGE to consumer 2. If in step 1, consumer 1 committed the leader epoch for the message, then when consumer 2 receives assignment P it would force a metadata refresh to discover a sufficiently new leader epoch for the committed offset. Kafka Streams cannot fully determine the leader epoch of the offsets it wants to commit - in EOS mode, streams commits the offset after the last control records (to avoid always having a lag of >0), but the leader epoch of the control record is not known to streams (since only non-control records are returned from Consumer.poll). A fix discussed with [~hachikuji] is to have the consumer cache leader epoch ranges, similar to how the broker maintains a leader epoch cache. This ticket was split from the original ticket https://issues.apache.org/jira/browse/KAFKA-15344 which was described as a streams fix, but the problem cannot be fully fixed in streams. was: We noticed a streams application received an OFFSET_OUT_OF_RANGE error following a network partition and streams task rebalance and subsequently reset its offsets to the beginning. Inspecting the logs, we saw multiple consumer log messages like: {code:java} Setting offset for partition tp to the committed offset FetchPosition{offset=1234, offsetEpoch=Optional.empty...) {code} Inspecting the streams code, it looks like kafka streams calls `commitSync` passing through an explicit OffsetAndMetadata object but does not populate the offset leader epoch. The offset leader epoch is required in the offset commit to ensure that all consumers in the consumer group have coherent metadata before fetching. Otherwise after a consumer group rebalance, a consumer may fetch with a stale leader epoch with respect to the committed offset and get an offset out of range error from a zombie partition leader. An example of where this can cause issues: 1. We have a consumer group with consumer 1 and consumer 2. Partition P is assigned to consumer 1 which has up-to-date metadata for P. Consumer 2 has stale metadata for P. 2. Consumer 1 fetches partition P with offset 50, epoch 8. commits the offset 50 without an epoch. 3. The consumer group rebalances and P is now assigned to consumer 2. Consumer 2 has a stale leader epoch for P (let's say leader epoch 7). Consumer 2 will now try to fetch with leader epoch 7, offset 50. If we have a zombie leader due to a network partition, the zombie leader may accept consumer 2's fetch leader epoch and return an OFFSET_OUT_OF_RANGE to consumer 2. If in step 1, consumer 1 committed the leader epoch for the message, then when consumer 2 receives assignment P it would force a metadata refresh to discover a sufficiently new leader epoch for the committed offset. Kafka Streams cannot fully determine the leader epoch of the offsets it wants to commit - in EOS mode, streams commits the offset after the last control records (to avoid always having a lag of >0), but the leader epoch of the control record is not known to streams (since only non-control records are returned from Consumer.poll). A fix discussed with [~hachikuji] is to have the consumer cache leader epoch ranges, similar to how the broker maint
[jira] [Updated] (KAFKA-15344) Kafka Streams should include the message leader epoch when committing offsets
[ https://issues.apache.org/jira/browse/KAFKA-15344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy updated KAFKA-15344: --- Description: We noticed an application received an OFFSET_OUT_OF_RANGE error following a network partition and streams task rebalance and subsequently reset its offsets to the beginning. Inspecting the logs, we saw multiple consumer log messages like: {code:java} Setting offset for partition tp to the committed offset FetchPosition{offset=1234, offsetEpoch=Optional.empty...) {code} Inspecting the streams code, it looks like kafka streams calls `commitSync` passing through an explicit OffsetAndMetadata object but does not populate the offset leader epoch. The offset leader epoch is required in the offset commit to ensure that all consumers in the consumer group have coherent metadata before fetching. Otherwise after a consumer group rebalance, a consumer may fetch with a stale leader epoch with respect to the committed offset and get an offset out of range error from a zombie partition leader. An example of where this can cause issues: 1. We have a consumer group with consumer 1 and consumer 2. Partition P is assigned to consumer 1 which has up-to-date metadata for P. Consumer 2 has stale metadata for P. 2. Consumer 1 fetches partition P with offset 50, epoch 8. commits the offset 50 without an epoch. 3. The consumer group rebalances and P is now assigned to consumer 2. Consumer 2 has a stale leader epoch for P (let's say leader epoch 7). Consumer 2 will now try to fetch with leader epoch 7, offset 50. If we have a zombie leader due to a network partition, the zombie leader may accept consumer 2's fetch leader epoch and return an OFFSET_OUT_OF_RANGE to consumer 2. If in step 1, consumer 1 committed the leader epoch for the message, then when consumer 2 receives assignment P it would force a metadata refresh to discover a sufficiently new leader epoch for the committed offset. The aim of this ticket is to let Kafka Streams commit offsets with leader epochs wherever possible. However, note that Kafka Streams cannot fully determine the leader epoch of the offsets it wants to commit - in EOS mode, streams commits the offset after the last control records (to avoid always having a lag of >0), but the leader epoch of the control record is not known to streams (since only non-control records are returned from Consumer.poll). Therefore, for the EOS case, the above problem cannot be solved with a change in streams alone. Instead, https://issues.apache.org/jira/browse/KAFKA-16248 shouild be implemented. was: We noticed an application received an OFFSET_OUT_OF_RANGE error following a network partition and streams task rebalance and subsequently reset its offsets to the beginning. Inspecting the logs, we saw multiple consumer log messages like: {code:java} Setting offset for partition tp to the committed offset FetchPosition{offset=1234, offsetEpoch=Optional.empty...) {code} Inspecting the streams code, it looks like kafka streams calls `commitSync` passing through an explicit OffsetAndMetadata object but does not populate the offset leader epoch. The offset leader epoch is required in the offset commit to ensure that all consumers in the consumer group have coherent metadata before fetching. Otherwise after a consumer group rebalance, a consumer may fetch with a stale leader epoch with respect to the committed offset and get an offset out of range error from a zombie partition leader. An example of where this can cause issues: 1. We have a consumer group with consumer 1 and consumer 2. Partition P is assigned to consumer 1 which has up-to-date metadata for P. Consumer 2 has stale metadata for P. 2. Consumer 1 fetches partition P with offset 50, epoch 8. commits the offset 50 without an epoch. 3. The consumer group rebalances and P is now assigned to consumer 2. Consumer 2 has a stale leader epoch for P (let's say leader epoch 7). Consumer 2 will now try to fetch with leader epoch 7, offset 50. If we have a zombie leader due to a network partition, the zombie leader may accept consumer 2's fetch leader epoch and return an OFFSET_OUT_OF_RANGE to consumer 2. If in step 1, consumer 1 committed the leader epoch for the message, then when consumer 2 receives assignment P it would force a metadata refresh to discover a sufficiently new leader epoch for the committed offset. The low-hanging fruit fix would be to have streams pass in the message epoch for each commit. Another fix discussed with [~hachikuji] is to have the consumer cache leader epoch ranges, similar to how the broker maintains a leader epoch cache. > Kafka Streams should include the message leader epoch when committing offsets > - > > Key: KAFKA-15344 > URL: https://issues.apache.org/ji
[jira] [Created] (KAFKA-16248) Kafka consumer should cache leader offset ranges
Lucas Brutschy created KAFKA-16248: -- Summary: Kafka consumer should cache leader offset ranges Key: KAFKA-16248 URL: https://issues.apache.org/jira/browse/KAFKA-16248 Project: Kafka Issue Type: Bug Reporter: Lucas Brutschy We noticed a streams application received an OFFSET_OUT_OF_RANGE error following a network partition and streams task rebalance and subsequently reset its offsets to the beginning. Inspecting the logs, we saw multiple consumer log messages like: {code:java} Setting offset for partition tp to the committed offset FetchPosition{offset=1234, offsetEpoch=Optional.empty...) {code} Inspecting the streams code, it looks like kafka streams calls `commitSync` passing through an explicit OffsetAndMetadata object but does not populate the offset leader epoch. The offset leader epoch is required in the offset commit to ensure that all consumers in the consumer group have coherent metadata before fetching. Otherwise after a consumer group rebalance, a consumer may fetch with a stale leader epoch with respect to the committed offset and get an offset out of range error from a zombie partition leader. An example of where this can cause issues: 1. We have a consumer group with consumer 1 and consumer 2. Partition P is assigned to consumer 1 which has up-to-date metadata for P. Consumer 2 has stale metadata for P. 2. Consumer 1 fetches partition P with offset 50, epoch 8. commits the offset 50 without an epoch. 3. The consumer group rebalances and P is now assigned to consumer 2. Consumer 2 has a stale leader epoch for P (let's say leader epoch 7). Consumer 2 will now try to fetch with leader epoch 7, offset 50. If we have a zombie leader due to a network partition, the zombie leader may accept consumer 2's fetch leader epoch and return an OFFSET_OUT_OF_RANGE to consumer 2. If in step 1, consumer 1 committed the leader epoch for the message, then when consumer 2 receives assignment P it would force a metadata refresh to discover a sufficiently new leader epoch for the committed offset. Kafka Streams cannot fully determine the leader epoch of the offsets it wants to commit - in EOS mode, streams commits the offset after the last control records (to avoid always having a lag of >0), but the leader epoch of the control record is not known to streams (since only non-control records are returned from Consumer.poll). A fix discussed with [~hachikuji] is to have the consumer cache leader epoch ranges, similar to how the broker maintains a leader epoch cache. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16243) Idle kafka-console-consumer with new consumer group protocol preemptively leaves group
[ https://issues.apache.org/jira/browse/KAFKA-16243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-16243: -- Assignee: Lucas Brutschy (was: Andrew Schofield) > Idle kafka-console-consumer with new consumer group protocol preemptively > leaves group > -- > > Key: KAFKA-16243 > URL: https://issues.apache.org/jira/browse/KAFKA-16243 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.7.0 >Reporter: Andrew Schofield >Assignee: Lucas Brutschy >Priority: Critical > > Using the new consumer group protocol with kafka-console-consumer.sh, I find > that if I leave the consumer with no records to process for 5 minutes > (max.poll.interval.ms = 30ms), the tool logs the following warning > message and leaves the group. > "consumer poll timeout has expired. This means the time between subsequent > calls to poll() was longer than the configured max.poll.interval.ms, which > typically implies that the poll loop is spending too much time processing > messages. You can address this either by increasing max.poll.interval.ms or > by reducing the maximum size of batches returned in poll() with > max.poll.records." > With the older consumer, this did not occur. > The reason is that the consumer keeps a poll timer which is used to ensure > liveness of the application thread. The poll timer automatically updates > while the `Consumer.poll(Duration)` method is blocked, while the newer > consumer only updates the poll timer when a new call to > `Consumer.poll(Duration)` is issued. This means that the > kafka-console-consumer.sh tools, which uses a very long timeout by default, > works differently with the new consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16220) KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions is flaky
[ https://issues.apache.org/jira/browse/KAFKA-16220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy resolved KAFKA-16220. Resolution: Fixed > KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions > is flaky > > > Key: KAFKA-16220 > URL: https://issues.apache.org/jira/browse/KAFKA-16220 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Major > Labels: flaky, flaky-test > > This test has seen significant flakyness > > https://ge.apache.org/s/fac7lploprvuu/tests/task/:streams:test/details/org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest/shouldThrowIllegalArgumentExceptionWhenCustomPartitionerReturnsMultiplePartitions()?top-execution=1 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16220) KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions is flaky
[ https://issues.apache.org/jira/browse/KAFKA-16220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17816550#comment-17816550 ] Lucas Brutschy commented on KAFKA-16220: Closing, as it hasn't failed in the last 10 days since the PR. Reopen if it happens again > KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions > is flaky > > > Key: KAFKA-16220 > URL: https://issues.apache.org/jira/browse/KAFKA-16220 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Major > Labels: flaky, flaky-test > > This test has seen significant flakyness > > https://ge.apache.org/s/fac7lploprvuu/tests/task/:streams:test/details/org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest/shouldThrowIllegalArgumentExceptionWhenCustomPartitionerReturnsMultiplePartitions()?top-execution=1 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14384) Flaky Test SelfJoinUpgradeIntegrationTest.shouldUpgradeWithTopologyOptimizationOff
[ https://issues.apache.org/jira/browse/KAFKA-14384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17814363#comment-17814363 ] Lucas Brutschy commented on KAFKA-14384: Hasn't failed in the last 30 dasy, so closing it. https://ge.apache.org/scans/tests?search.names=Git%20branch&search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.timeZoneId=Europe%2FBerlin&search.values=trunk&tests.container=*SelfJoinUpgradeIntegrationTest > Flaky Test > SelfJoinUpgradeIntegrationTest.shouldUpgradeWithTopologyOptimizationOff > -- > > Key: KAFKA-14384 > URL: https://issues.apache.org/jira/browse/KAFKA-14384 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Critical > Labels: flaky-test > > h3. Stacktrace > java.lang.AssertionError: Did not receive all 5 records from topic > selfjoin-outputSelfJoinUpgradeIntegrationTestshouldUpgradeWithTopologyOptimizationOff > within 6 ms Expected: is a value equal to or greater than <5> but: <0> > was less than <5> at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueWithTimestampRecordsReceived$2(IntegrationTestUtils.java:763) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:382) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:350) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(IntegrationTestUtils.java:759) > at > org.apache.kafka.streams.integration.SelfJoinUpgradeIntegrationTest.processKeyValueAndVerifyCount(SelfJoinUpgradeIntegrationTest.java:244) > at > org.apache.kafka.streams.integration.SelfJoinUpgradeIntegrationTest.shouldUpgradeWithTopologyOptimizationOff(SelfJoinUpgradeIntegrationTest.java:155) > > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12835/4/testReport/org.apache.kafka.streams.integration/SelfJoinUpgradeIntegrationTest/Build___JDK_11_and_Scala_2_13___shouldUpgradeWithTopologyOptimizationOff/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14384) Flaky Test SelfJoinUpgradeIntegrationTest.shouldUpgradeWithTopologyOptimizationOff
[ https://issues.apache.org/jira/browse/KAFKA-14384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy resolved KAFKA-14384. Resolution: Fixed > Flaky Test > SelfJoinUpgradeIntegrationTest.shouldUpgradeWithTopologyOptimizationOff > -- > > Key: KAFKA-14384 > URL: https://issues.apache.org/jira/browse/KAFKA-14384 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Critical > Labels: flaky-test > > h3. Stacktrace > java.lang.AssertionError: Did not receive all 5 records from topic > selfjoin-outputSelfJoinUpgradeIntegrationTestshouldUpgradeWithTopologyOptimizationOff > within 6 ms Expected: is a value equal to or greater than <5> but: <0> > was less than <5> at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueWithTimestampRecordsReceived$2(IntegrationTestUtils.java:763) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:382) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:350) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(IntegrationTestUtils.java:759) > at > org.apache.kafka.streams.integration.SelfJoinUpgradeIntegrationTest.processKeyValueAndVerifyCount(SelfJoinUpgradeIntegrationTest.java:244) > at > org.apache.kafka.streams.integration.SelfJoinUpgradeIntegrationTest.shouldUpgradeWithTopologyOptimizationOff(SelfJoinUpgradeIntegrationTest.java:155) > > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12835/4/testReport/org.apache.kafka.streams.integration/SelfJoinUpgradeIntegrationTest/Build___JDK_11_and_Scala_2_13___shouldUpgradeWithTopologyOptimizationOff/ -- This message was sent by Atlassian Jira (v8.20.10#820010)