[jira] [Commented] (KAFKA-7657) Invalid reporting of stream state in Kafka streams application
[ https://issues.apache.org/jira/browse/KAFKA-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750278#comment-16750278 ] James Cheng commented on KAFKA-7657: This was fixed in [https://github.com/apache/kafka/pull/6091] But the Jira didn't get updated/linked to it, because the title for that Jira is "K7657" instead of "KAFKA-7657" > Invalid reporting of stream state in Kafka streams application > -- > > Key: KAFKA-7657 > URL: https://issues.apache.org/jira/browse/KAFKA-7657 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.1 >Reporter: Thomas Crowley >Assignee: Guozhang Wang >Priority: Major > Labels: bug > Fix For: 2.2.0 > > > We have a streams application with 3 instances running, two of which are > reporting the state of REBALANCING even after they have been running for > days. Restarting the application has no effect on the stream state. > This seems suspect because each instance appears to be processing messages, > and the kafka-consumer-groups CLI tool reports hardly any offset lag in any > of the partitions assigned to the REBALANCING consumers. Each partition seems > to be processing an equal amount of records too. > Inspecting the state.dir on disk, it looks like the RocksDB state has been > built and hovers at the expected size on disk. > This problem has persisted for us after we rebuilt our Kafka cluster and > reset topics + consumer groups in our dev environment. > There is nothing in the logs (with level set to DEBUG) in both the broker or > the application that suggests something exceptional has happened causing the > application to be stuck REBALANCING. > We are also running multiple streaming applications where this problem does > not exist. > Two differences between this application and our other streaming applications > are: > * We have processing.guarantee set to exactly_once > * We are using a ValueTransformer which fetches from and puts data on a > windowed state store > The REBALANCING state is returned from both polling the state method of our > KafkaStreams instance, and our custom metric which is derived from some logic > in a KafkaStreams.StateListener class attached via the setStateListener > method. > > While I have provided a bit of context, before I reply with some reproducible > code - is there a simple way in which I can determine that my streams > application is in a RUNNING state without relying on the same mechanisms as > used above? > Further, given that it seems like my application is actually running - could > this perhaps be a bug to do with how the stream state is being reported (in > the context of a transactional stream using the processor API)? > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7657) Invalid reporting of stream state in Kafka streams application
[ https://issues.apache.org/jira/browse/KAFKA-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16734643#comment-16734643 ] ASF GitHub Bot commented on KAFKA-7657: --- guozhangwang commented on pull request #6090: Revert "KAFKA-7657: Fixing thread state change to instance state change" URL: https://github.com/apache/kafka/pull/6090 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Invalid reporting of stream state in Kafka streams application > -- > > Key: KAFKA-7657 > URL: https://issues.apache.org/jira/browse/KAFKA-7657 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.1 >Reporter: Thomas Crowley >Priority: Major > Labels: bug > > We have a streams application with 3 instances running, two of which are > reporting the state of REBALANCING even after they have been running for > days. Restarting the application has no effect on the stream state. > This seems suspect because each instance appears to be processing messages, > and the kafka-consumer-groups CLI tool reports hardly any offset lag in any > of the partitions assigned to the REBALANCING consumers. Each partition seems > to be processing an equal amount of records too. > Inspecting the state.dir on disk, it looks like the RocksDB state has been > built and hovers at the expected size on disk. > This problem has persisted for us after we rebuilt our Kafka cluster and > reset topics + consumer groups in our dev environment. > There is nothing in the logs (with level set to DEBUG) in both the broker or > the application that suggests something exceptional has happened causing the > application to be stuck REBALANCING. > We are also running multiple streaming applications where this problem does > not exist. > Two differences between this application and our other streaming applications > are: > * We have processing.guarantee set to exactly_once > * We are using a ValueTransformer which fetches from and puts data on a > windowed state store > The REBALANCING state is returned from both polling the state method of our > KafkaStreams instance, and our custom metric which is derived from some logic > in a KafkaStreams.StateListener class attached via the setStateListener > method. > > While I have provided a bit of context, before I reply with some reproducible > code - is there a simple way in which I can determine that my streams > application is in a RUNNING state without relying on the same mechanisms as > used above? > Further, given that it seems like my application is actually running - could > this perhaps be a bug to do with how the stream state is being reported (in > the context of a transactional stream using the processor API)? > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7657) Invalid reporting of stream state in Kafka streams application
[ https://issues.apache.org/jira/browse/KAFKA-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16734642#comment-16734642 ] ASF GitHub Bot commented on KAFKA-7657: --- guozhangwang commented on pull request #6090: Revert "KAFKA-7657: Fixing thread state change to instance state change" URL: https://github.com/apache/kafka/pull/6090 Reverts apache/kafka#6018 because it lacks one final commit. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Invalid reporting of stream state in Kafka streams application > -- > > Key: KAFKA-7657 > URL: https://issues.apache.org/jira/browse/KAFKA-7657 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.1 >Reporter: Thomas Crowley >Priority: Major > Labels: bug > > We have a streams application with 3 instances running, two of which are > reporting the state of REBALANCING even after they have been running for > days. Restarting the application has no effect on the stream state. > This seems suspect because each instance appears to be processing messages, > and the kafka-consumer-groups CLI tool reports hardly any offset lag in any > of the partitions assigned to the REBALANCING consumers. Each partition seems > to be processing an equal amount of records too. > Inspecting the state.dir on disk, it looks like the RocksDB state has been > built and hovers at the expected size on disk. > This problem has persisted for us after we rebuilt our Kafka cluster and > reset topics + consumer groups in our dev environment. > There is nothing in the logs (with level set to DEBUG) in both the broker or > the application that suggests something exceptional has happened causing the > application to be stuck REBALANCING. > We are also running multiple streaming applications where this problem does > not exist. > Two differences between this application and our other streaming applications > are: > * We have processing.guarantee set to exactly_once > * We are using a ValueTransformer which fetches from and puts data on a > windowed state store > The REBALANCING state is returned from both polling the state method of our > KafkaStreams instance, and our custom metric which is derived from some logic > in a KafkaStreams.StateListener class attached via the setStateListener > method. > > While I have provided a bit of context, before I reply with some reproducible > code - is there a simple way in which I can determine that my streams > application is in a RUNNING state without relying on the same mechanisms as > used above? > Further, given that it seems like my application is actually running - could > this perhaps be a bug to do with how the stream state is being reported (in > the context of a transactional stream using the processor API)? > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7657) Invalid reporting of stream state in Kafka streams application
[ https://issues.apache.org/jira/browse/KAFKA-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16734639#comment-16734639 ] ASF GitHub Bot commented on KAFKA-7657: --- guozhangwang commented on pull request #6018: KAFKA-7657: Fixing thread state change to instance state change URL: https://github.com/apache/kafka/pull/6018 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Invalid reporting of stream state in Kafka streams application > -- > > Key: KAFKA-7657 > URL: https://issues.apache.org/jira/browse/KAFKA-7657 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.1 >Reporter: Thomas Crowley >Priority: Major > Labels: bug > > We have a streams application with 3 instances running, two of which are > reporting the state of REBALANCING even after they have been running for > days. Restarting the application has no effect on the stream state. > This seems suspect because each instance appears to be processing messages, > and the kafka-consumer-groups CLI tool reports hardly any offset lag in any > of the partitions assigned to the REBALANCING consumers. Each partition seems > to be processing an equal amount of records too. > Inspecting the state.dir on disk, it looks like the RocksDB state has been > built and hovers at the expected size on disk. > This problem has persisted for us after we rebuilt our Kafka cluster and > reset topics + consumer groups in our dev environment. > There is nothing in the logs (with level set to DEBUG) in both the broker or > the application that suggests something exceptional has happened causing the > application to be stuck REBALANCING. > We are also running multiple streaming applications where this problem does > not exist. > Two differences between this application and our other streaming applications > are: > * We have processing.guarantee set to exactly_once > * We are using a ValueTransformer which fetches from and puts data on a > windowed state store > The REBALANCING state is returned from both polling the state method of our > KafkaStreams instance, and our custom metric which is derived from some logic > in a KafkaStreams.StateListener class attached via the setStateListener > method. > > While I have provided a bit of context, before I reply with some reproducible > code - is there a simple way in which I can determine that my streams > application is in a RUNNING state without relying on the same mechanisms as > used above? > Further, given that it seems like my application is actually running - could > this perhaps be a bug to do with how the stream state is being reported (in > the context of a transactional stream using the processor API)? > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7657) Invalid reporting of stream state in Kafka streams application
[ https://issues.apache.org/jira/browse/KAFKA-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16714156#comment-16714156 ] ASF GitHub Bot commented on KAFKA-7657: --- guozhangwang opened a new pull request #6018: KAFKA-7657: Fixing thread state change to instance state change URL: https://github.com/apache/kafka/pull/6018 While looking into KAFKA-7657, I found there are a few loopholes in this logic: 1. We kept a map of thread-name to thread-state and a global-thread state at the KafkaStreams instance-level, in addition to the instance state itself. `stateLock` is used when accessing the instance state, however when we are in the thread state change callback, we are accessing both the thread-states as well as the instance state at the same time in the callers of `setState` without a lock, which is vulnerable to concurrent multi-stream threads. The fix is a) introduce a `threadStatesLock` in addition to the `stateLock`, which should always be grabbed to modify the thread-states map before the `stateLock` for modifying the instance level; and we also defer the checking of the instance-level state inside the `setState` call. 2. When transiting to state.RUNNING, we check if all threads are either in RUNNING or DEAD state, this is because some threads maybe dead at the rebalance period but we should still proceed to RUNNING if the rest of threads are still transiting to RUNNING. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Invalid reporting of stream state in Kafka streams application > -- > > Key: KAFKA-7657 > URL: https://issues.apache.org/jira/browse/KAFKA-7657 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.1 >Reporter: Thomas Crowley >Priority: Major > Labels: bug > > We have a streams application with 3 instances running, two of which are > reporting the state of REBALANCING even after they have been running for > days. Restarting the application has no effect on the stream state. > This seems suspect because each instance appears to be processing messages, > and the kafka-consumer-groups CLI tool reports hardly any offset lag in any > of the partitions assigned to the REBALANCING consumers. Each partition seems > to be processing an equal amount of records too. > Inspecting the state.dir on disk, it looks like the RocksDB state has been > built and hovers at the expected size on disk. > This problem has persisted for us after we rebuilt our Kafka cluster and > reset topics + consumer groups in our dev environment. > There is nothing in the logs (with level set to DEBUG) in both the broker or > the application that suggests something exceptional has happened causing the > application to be stuck REBALANCING. > We are also running multiple streaming applications where this problem does > not exist. > Two differences between this application and our other streaming applications > are: > * We have processing.guarantee set to exactly_once > * We are using a ValueTransformer which fetches from and puts data on a > windowed state store > The REBALANCING state is returned from both polling the state method of our > KafkaStreams instance, and our custom metric which is derived from some logic > in a KafkaStreams.StateListener class attached via the setStateListener > method. > > While I have provided a bit of context, before I reply with some reproducible > code - is there a simple way in which I can determine that my streams > application is in a RUNNING state without relying on the same mechanisms as > used above? > Further, given that it seems like my application is actually running - could > this perhaps be a bug to do with how the stream state is being reported (in > the context of a transactional stream using the processor API)? > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7657) Invalid reporting of stream state in Kafka streams application
[ https://issues.apache.org/jira/browse/KAFKA-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16712734#comment-16712734 ] Patrik Kleindl commented on KAFKA-7657: --- [~guozhang] Thanks for the fast analysis. It is not a major problem for us currently, so a fix in an upcoming 2.0.x or 2.1 would be sufficient. As you guessed correctly the retries is set to 10 and I will increase it to hopefully prevent such a problem. > Invalid reporting of stream state in Kafka streams application > -- > > Key: KAFKA-7657 > URL: https://issues.apache.org/jira/browse/KAFKA-7657 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.1 >Reporter: Thomas Crowley >Priority: Major > Labels: bug > > We have a streams application with 3 instances running, two of which are > reporting the state of REBALANCING even after they have been running for > days. Restarting the application has no effect on the stream state. > This seems suspect because each instance appears to be processing messages, > and the kafka-consumer-groups CLI tool reports hardly any offset lag in any > of the partitions assigned to the REBALANCING consumers. Each partition seems > to be processing an equal amount of records too. > Inspecting the state.dir on disk, it looks like the RocksDB state has been > built and hovers at the expected size on disk. > This problem has persisted for us after we rebuilt our Kafka cluster and > reset topics + consumer groups in our dev environment. > There is nothing in the logs (with level set to DEBUG) in both the broker or > the application that suggests something exceptional has happened causing the > application to be stuck REBALANCING. > We are also running multiple streaming applications where this problem does > not exist. > Two differences between this application and our other streaming applications > are: > * We have processing.guarantee set to exactly_once > * We are using a ValueTransformer which fetches from and puts data on a > windowed state store > The REBALANCING state is returned from both polling the state method of our > KafkaStreams instance, and our custom metric which is derived from some logic > in a KafkaStreams.StateListener class attached via the setStateListener > method. > > While I have provided a bit of context, before I reply with some reproducible > code - is there a simple way in which I can determine that my streams > application is in a RUNNING state without relying on the same mechanisms as > used above? > Further, given that it seems like my application is actually running - could > this perhaps be a bug to do with how the stream state is being reported (in > the context of a transactional stream using the processor API)? > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7657) Invalid reporting of stream state in Kafka streams application
[ https://issues.apache.org/jira/browse/KAFKA-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711839#comment-16711839 ] Guozhang Wang commented on KAFKA-7657: -- [~pkleindl] I think I've spotted the root cause of your issue. Here s a brief summary: 1) During the time of {{2018-11-30 09:08:14}} there is a broker-side leader migration, which caused Streams' embedded producers to get {{NotLeaderForPartitionException}}. This error is not a fatal error but retriable. The producer will retry based on the configured number of retries and backoffs. In 2.0.1 that you and [~tscrowley] is running, the default value is 10 and 100ms (in trunk and in the up coming 2.1.0 version we've changed the default to int.MAX as part of KIP-91), which means that if you did not change that value, then if the broker side's metadata did not get refreshed in 10 * 100ms ~ 1 second the num.retries will be exhausted and it will be treated as fatal and cause the producer to throw. 2) When producer throws (which is the case from your logs), the corresponding stream thread (15 in your case) will be shutdown gracefully, and its tasks are migrated to other threads. And I think after that the broker side resumes normal so the other threads can proceed, i.e from then on the streams app instance is running with one thread less. But from outside this is still normal. 3) The bug correlated to this scenario is that, when 2) happens we did not remove the corresponding thread-state from the map, and hence because this dangling state is never coming back to RUNNING, the application-instance-level state would never be transited. I can provide a PR for this issue. 4) Atm, if you want to be notified if something similar happens which causes you to lose some threads within the application instance, you can either a) watch one the thread-level metrics, and alert when they've gone black, or b) watch on the thread-level state as well, which can be read from KafkaStreams#localThreadMetadata. > Invalid reporting of stream state in Kafka streams application > -- > > Key: KAFKA-7657 > URL: https://issues.apache.org/jira/browse/KAFKA-7657 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.1 >Reporter: Thomas Crowley >Priority: Major > Labels: bug > > We have a streams application with 3 instances running, two of which are > reporting the state of REBALANCING even after they have been running for > days. Restarting the application has no effect on the stream state. > This seems suspect because each instance appears to be processing messages, > and the kafka-consumer-groups CLI tool reports hardly any offset lag in any > of the partitions assigned to the REBALANCING consumers. Each partition seems > to be processing an equal amount of records too. > Inspecting the state.dir on disk, it looks like the RocksDB state has been > built and hovers at the expected size on disk. > This problem has persisted for us after we rebuilt our Kafka cluster and > reset topics + consumer groups in our dev environment. > There is nothing in the logs (with level set to DEBUG) in both the broker or > the application that suggests something exceptional has happened causing the > application to be stuck REBALANCING. > We are also running multiple streaming applications where this problem does > not exist. > Two differences between this application and our other streaming applications > are: > * We have processing.guarantee set to exactly_once > * We are using a ValueTransformer which fetches from and puts data on a > windowed state store > The REBALANCING state is returned from both polling the state method of our > KafkaStreams instance, and our custom metric which is derived from some logic > in a KafkaStreams.StateListener class attached via the setStateListener > method. > > While I have provided a bit of context, before I reply with some reproducible > code - is there a simple way in which I can determine that my streams > application is in a RUNNING state without relying on the same mechanisms as > used above? > Further, given that it seems like my application is actually running - could > this perhaps be a bug to do with how the stream state is being reported (in > the context of a transactional stream using the processor API)? > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7657) Invalid reporting of stream state in Kafka streams application
[ https://issues.apache.org/jira/browse/KAFKA-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711247#comment-16711247 ] Patrik Kleindl commented on KAFKA-7657: --- [~guozhang] {code:java} 2018-11-30 08:58:25,560 INFO [org.apache.kafka.streams.processor.internals.StreamThread] (application-ab20ef7d-6de2-4335-a8ce-4cb81d01fb7e-StreamThread-11) - stream-thread [application-ab20ef7d-6de2-4335-a8ce-4cb81d01fb7e-StreamThread-11] State transition from RUNNING to PARTITIONS_REVOKED 2018-11-30 08:58:25,561 INFO [org.apache.kafka.streams.KafkaStreams] (application-ab20ef7d-6de2-4335-a8ce-4cb81d01fb7e-StreamThread-11) - stream-client [application-ab20ef7d-6de2-4335-a8ce-4cb81d01fb7e] State transition from RUNNING to REBALANCING 2018-11-30 08:58:25,630 INFO [org.apache.kafka.streams.processor.internals.StreamThread] (application-ab20ef7d-6de2-4335-a8ce-4cb81d01fb7e-StreamThread-10) - stream-thread [application-ab20ef7d-6de2-4335-a8ce-4cb81d01fb7e-StreamThread-10] State transition from RUNNING to PARTITIONS_REVOKED 2018-11-30 08:58:25,638 INFO [org.apache.kafka.streams.processor.internals.StreamThread] (application-ab20ef7d-6de2-4335-a8ce-4cb81d01fb7e-StreamThread-9) - stream-thread [application-ab20ef7d-6de2-4335-a8ce-4cb81d01fb7e-StreamThread-9] State transition from RUNNING to PARTITIONS_REVOKED 2018-11-30 08:58:25,885 INFO [org.apache.kafka.streams.processor.internals.StreamThread] (application-ab20ef7d-6de2-4335-a8ce-4cb81d01fb7e-StreamThread-12) - stream-thread [application-ab20ef7d-6de2-4335-a8ce-4cb81d01fb7e-StreamThread-12] State transition from RUNNING to PARTITIONS_REVOKED 2018-11-30 08:58:28,238 INFO [org.apache.kafka.streams.processor.internals.StreamThread] (application-ab20ef7d-6de2-4335-a8ce-4cb81d01fb7e-StreamThread-11) - stream-thread [application-ab20ef7d-6de2-4335-a8ce-4cb81d01fb7e-StreamThread-11] State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED 2018-11-30 08:58:28,238 INFO [org.apache.kafka.streams.processor.internals.StreamThread] (application-ab20ef7d-6de2-4335-a8ce-4cb81d01fb7e-StreamThread-9) - stream-thread [application-ab20ef7d-6de2-4335-a8ce-4cb81d01fb7e-StreamThread-9] State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED 2018-11-30 08:58:28,249 INFO [org.apache.kafka.streams.processor.internals.StreamThread] (application-ab20ef7d-6de2-4335-a8ce-4cb81d01fb7e-StreamThread-10) - stream-thread [application-ab20ef7d-6de2-4335-a8ce-4cb81d01fb7e-StreamThread-10] State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED 2018-11-30 08:58:28,279 INFO [org.apache.kafka.streams.processor.internals.StreamThread] (application-ab20ef7d-6de2-4335-a8ce-4cb81d01fb7e-StreamThread-12) - stream-thread [application-ab20ef7d-6de2-4335-a8ce-4cb81d01fb7e-StreamThread-12] State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED 2018-11-30 08:58:28,505 INFO [org.apache.kafka.streams.processor.internals.StreamThread] (application-ab20ef7d-6de2-4335-a8ce-4cb81d01fb7e-StreamThread-11) - stream-thread [application-ab20ef7d-6de2-4335-a8ce-4cb81d01fb7e-StreamThread-11] State transition from PARTITIONS_ASSIGNED to RUNNING 2018-11-30 08:58:29,664 INFO [org.apache.kafka.streams.processor.internals.StreamThread] (application-ab20ef7d-6de2-4335-a8ce-4cb81d01fb7e-StreamThread-10) - stream-thread [application-ab20ef7d-6de2-4335-a8ce-4cb81d01fb7e-StreamThread-10] State transition from PARTITIONS_ASSIGNED to RUNNING 2018-11-30 08:58:33,300 INFO [org.apache.kafka.streams.processor.internals.StreamThread] (application-ab20ef7d-6de2-4335-a8ce-4cb81d01fb7e-StreamThread-9) - stream-thread [application-ab20ef7d-6de2-4335-a8ce-4cb81d01fb7e-StreamThread-9] State transition from PARTITIONS_ASSIGNED to RUNNING 2018-11-30 08:58:34,490 INFO [org.apache.kafka.streams.processor.internals.StreamThread] (application-ab20ef7d-6de2-4335-a8ce-4cb81d01fb7e-StreamThread-12) - stream-thread [application-ab20ef7d-6de2-4335-a8ce-4cb81d01fb7e-StreamThread-12] State transition from PARTITIONS_ASSIGNED to RUNNING 2018-11-30 08:58:34,491 INFO [org.apache.kafka.streams.KafkaStreams] (application-ab20ef7d-6de2-4335-a8ce-4cb81d01fb7e-StreamThread-12) - stream-client [application-ab20ef7d-6de2-4335-a8ce-4cb81d01fb7e] State transition from REBALANCING to RUNNING 2018-11-30 09:08:14,273 INFO [org.apache.kafka.streams.processor.internals.StreamThread] (application-610151c7-8769-4cc5-9254-969a831e4a4d-StreamThread-15) - stream-thread [application-610151c7-8769-4cc5-9254-969a831e4a4d-StreamThread-15] State transition from RUNNING to PENDING_SHUTDOWN 2018-11-30 09:08:14,715 INFO [org.apache.kafka.streams.processor.internals.StreamThread] (application-610151c7-8769-4cc5-9254-969a831e4a4d-StreamThread-15) - stream-thread [application-610151c7-8769-4cc5-9254-969a831e4a4d-StreamThread-15] State transition from PENDING_SHUTDOWN to DEAD 2018-11-30 09:08:45,750 INFO
[jira] [Commented] (KAFKA-7657) Invalid reporting of stream state in Kafka streams application
[ https://issues.apache.org/jira/browse/KAFKA-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16710864#comment-16710864 ] Guozhang Wang commented on KAFKA-7657: -- Another question for [~pkleindl] [~tscrowley]: in your topology, do you have any global stores / GlobalKTable? > Invalid reporting of stream state in Kafka streams application > -- > > Key: KAFKA-7657 > URL: https://issues.apache.org/jira/browse/KAFKA-7657 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.1 >Reporter: Thomas Crowley >Priority: Major > Labels: bug > > We have a streams application with 3 instances running, two of which are > reporting the state of REBALANCING even after they have been running for > days. Restarting the application has no effect on the stream state. > This seems suspect because each instance appears to be processing messages, > and the kafka-consumer-groups CLI tool reports hardly any offset lag in any > of the partitions assigned to the REBALANCING consumers. Each partition seems > to be processing an equal amount of records too. > Inspecting the state.dir on disk, it looks like the RocksDB state has been > built and hovers at the expected size on disk. > This problem has persisted for us after we rebuilt our Kafka cluster and > reset topics + consumer groups in our dev environment. > There is nothing in the logs (with level set to DEBUG) in both the broker or > the application that suggests something exceptional has happened causing the > application to be stuck REBALANCING. > We are also running multiple streaming applications where this problem does > not exist. > Two differences between this application and our other streaming applications > are: > * We have processing.guarantee set to exactly_once > * We are using a ValueTransformer which fetches from and puts data on a > windowed state store > The REBALANCING state is returned from both polling the state method of our > KafkaStreams instance, and our custom metric which is derived from some logic > in a KafkaStreams.StateListener class attached via the setStateListener > method. > > While I have provided a bit of context, before I reply with some reproducible > code - is there a simple way in which I can determine that my streams > application is in a RUNNING state without relying on the same mechanisms as > used above? > Further, given that it seems like my application is actually running - could > this perhaps be a bug to do with how the stream state is being reported (in > the context of a transactional stream using the processor API)? > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7657) Invalid reporting of stream state in Kafka streams application
[ https://issues.apache.org/jira/browse/KAFKA-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16710842#comment-16710842 ] Guozhang Wang commented on KAFKA-7657: -- [~pkleindl] Seems you have at least 16 because "client-610151c7-8769-4cc5-9254-969a831e4a4d-StreamThread-16", could you grab for the keywords of {code} State transition from {code} which should include both thread-level state transition as well as application instance-level state transition. Note that the latter is what we observed the issue, i.e. it never transits back to RUNNING, but it should be as long as ALL of its threads have transited to RUNNING. So I'd like to verify if: 1) all threads have indeed transit back to RUNNING after rebalance. 2) if 1) is true, is there any transition happened for the application instance-level state. > Invalid reporting of stream state in Kafka streams application > -- > > Key: KAFKA-7657 > URL: https://issues.apache.org/jira/browse/KAFKA-7657 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.1 >Reporter: Thomas Crowley >Priority: Major > Labels: bug > > We have a streams application with 3 instances running, two of which are > reporting the state of REBALANCING even after they have been running for > days. Restarting the application has no effect on the stream state. > This seems suspect because each instance appears to be processing messages, > and the kafka-consumer-groups CLI tool reports hardly any offset lag in any > of the partitions assigned to the REBALANCING consumers. Each partition seems > to be processing an equal amount of records too. > Inspecting the state.dir on disk, it looks like the RocksDB state has been > built and hovers at the expected size on disk. > This problem has persisted for us after we rebuilt our Kafka cluster and > reset topics + consumer groups in our dev environment. > There is nothing in the logs (with level set to DEBUG) in both the broker or > the application that suggests something exceptional has happened causing the > application to be stuck REBALANCING. > We are also running multiple streaming applications where this problem does > not exist. > Two differences between this application and our other streaming applications > are: > * We have processing.guarantee set to exactly_once > * We are using a ValueTransformer which fetches from and puts data on a > windowed state store > The REBALANCING state is returned from both polling the state method of our > KafkaStreams instance, and our custom metric which is derived from some logic > in a KafkaStreams.StateListener class attached via the setStateListener > method. > > While I have provided a bit of context, before I reply with some reproducible > code - is there a simple way in which I can determine that my streams > application is in a RUNNING state without relying on the same mechanisms as > used above? > Further, given that it seems like my application is actually running - could > this perhaps be a bug to do with how the stream state is being reported (in > the context of a transactional stream using the processor API)? > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7657) Invalid reporting of stream state in Kafka streams application
[ https://issues.apache.org/jira/browse/KAFKA-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16710573#comment-16710573 ] Patrik Kleindl commented on KAFKA-7657: --- [~guozhang] I have tried to grab the relevant part of the log and remove all client references, not much to be seen. {code:java} 2018-11-30 08:50:06,885 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (client-610151c7-8769-4cc5-9254-969a831e4a4d-StreamThread-16) - ... [Consumer clientId=client-610151c7-8769-4cc5-9254-969a831e4a4d-StreamThread-16-consumer, groupId=client-appname] Group coordinator broker:9092 (id: 2147483644 rack: null) is unavailable or invalid, will attempt rediscovery 2018-11-30 08:50:06,986 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (client-610151c7-8769-4cc5-9254-969a831e4a4d-StreamThread-16) - ... [Consumer clientId=client-610151c7-8769-4cc5-9254-969a831e4a4d-StreamThread-16-consumer, groupId=client-appname] Discovered group coordinator broker:9092 (id: 2147483644 rack: null) 2018-11-30 08:50:06,986 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (client-610151c7-8769-4cc5-9254-969a831e4a4d-StreamThread-16) - ... [Consumer clientId=client-610151c7-8769-4cc5-9254-969a831e4a4d-StreamThread-16-consumer, groupId=client-appname] Group coordinator broker:9092 (id: 2147483644 rack: null) is unavailable or invalid, will attempt rediscovery 2018-11-30 08:50:07,087 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (client-610151c7-8769-4cc5-9254-969a831e4a4d-StreamThread-16) - ... [Consumer clientId=client-610151c7-8769-4cc5-9254-969a831e4a4d-StreamThread-16-consumer, groupId=client-appname] Discovered group coordinator broker:9092 (id: 2147483644 rack: null) 2018-11-30 09:08:45,717 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (client-610151c7-8769-4cc5-9254-969a831e4a4d-StreamThread-16) - ... [Consumer clientId=client-610151c7-8769-4cc5-9254-969a831e4a4d-StreamThread-16-consumer, groupId=client-appname] Attempt to heartbeat failed since group is rebalancing 2018-11-30 09:08:45,749 INFO [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] (client-610151c7-8769-4cc5-9254-969a831e4a4d-StreamThread-16) - ... [Consumer clientId=client-610151c7-8769-4cc5-9254-969a831e4a4d-StreamThread-16-consumer, groupId=client-appname] Revoking previously assigned partitions [...] 2018-11-30 09:08:45,750 INFO [org.apache.kafka.streams.processor.internals.StreamThread] (client-610151c7-8769-4cc5-9254-969a831e4a4d-StreamThread-16) - ... stream-thread [client-610151c7-8769-4cc5-9254-969a831e4a4d-StreamThread-16] State transition from RUNNING to PARTITIONS_REVOKED 2018-11-30 09:08:45,750 INFO [org.apache.kafka.streams.KafkaStreams] (client-610151c7-8769-4cc5-9254-969a831e4a4d-StreamThread-16) - ... stream-client [client-610151c7-8769-4cc5-9254-969a831e4a4d] State transition from RUNNING to REBALANCING 2018-11-30 09:08:45,865 INFO [org.apache.kafka.streams.processor.internals.StreamThread] (client-610151c7-8769-4cc5-9254-969a831e4a4d-StreamThread-16) - ... stream-thread [client-610151c7-8769-4cc5-9254-969a831e4a4d-StreamThread-16] partition revocation took 115 ms. 2018-11-30 09:08:45,865 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (client-610151c7-8769-4cc5-9254-969a831e4a4d-StreamThread-16) - ... [Consumer clientId=client-610151c7-8769-4cc5-9254-969a831e4a4d-StreamThread-16-consumer, groupId=client-appname] (Re-)joining group 2018-11-30 09:08:47,544 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (client-610151c7-8769-4cc5-9254-969a831e4a4d-StreamThread-16) - ... [Consumer clientId=client-610151c7-8769-4cc5-9254-969a831e4a4d-StreamThread-16-consumer, groupId=client-appname] Successfully joined group with generation 3374 2018-11-30 09:08:47,547 INFO [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] (client-610151c7-8769-4cc5-9254-969a831e4a4d-StreamThread-16) - ... [Consumer clientId=client-610151c7-8769-4cc5-9254-969a831e4a4d-StreamThread-16-consumer, groupId=client-appname] Setting newly assigned partitions [...] 2018-11-30 09:08:47,547 INFO [org.apache.kafka.streams.processor.internals.StreamThread] (client-610151c7-8769-4cc5-9254-969a831e4a4d-StreamThread-16) - ... stream-thread [client-610151c7-8769-4cc5-9254-969a831e4a4d-StreamThread-16] State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED 2018-11-30 09:08:47,574 INFO [org.apache.kafka.streams.processor.internals.StreamThread] (client-610151c7-8769-4cc5-9254-969a831e4a4d-StreamThread-16) - ... stream-thread [client-610151c7-8769-4cc5-9254-969a831e4a4d-StreamThread-16] partition assignment took 27 ms. 2018-11-30 09:08:47,874 INFO [org.apache.kafka.streams.processor.internals.StreamThread] (client-610151c7-8769-4cc5-9254-969a831e4a4d-StreamThread-16) - ...
[jira] [Commented] (KAFKA-7657) Invalid reporting of stream state in Kafka streams application
[ https://issues.apache.org/jira/browse/KAFKA-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707585#comment-16707585 ] Guozhang Wang commented on KAFKA-7657: -- [~pkleindl] could you provide the logs around the time when the stream application has started to report REBALANCING so we can investigate? > Invalid reporting of stream state in Kafka streams application > -- > > Key: KAFKA-7657 > URL: https://issues.apache.org/jira/browse/KAFKA-7657 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.1 >Reporter: Thomas Crowley >Priority: Major > Labels: bug > > We have a streams application with 3 instances running, two of which are > reporting the state of REBALANCING even after they have been running for > days. Restarting the application has no effect on the stream state. > This seems suspect because each instance appears to be processing messages, > and the kafka-consumer-groups CLI tool reports hardly any offset lag in any > of the partitions assigned to the REBALANCING consumers. Each partition seems > to be processing an equal amount of records too. > Inspecting the state.dir on disk, it looks like the RocksDB state has been > built and hovers at the expected size on disk. > This problem has persisted for us after we rebuilt our Kafka cluster and > reset topics + consumer groups in our dev environment. > There is nothing in the logs (with level set to DEBUG) in both the broker or > the application that suggests something exceptional has happened causing the > application to be stuck REBALANCING. > We are also running multiple streaming applications where this problem does > not exist. > Two differences between this application and our other streaming applications > are: > * We have processing.guarantee set to exactly_once > * We are using a ValueTransformer which fetches from and puts data on a > windowed state store > The REBALANCING state is returned from both polling the state method of our > KafkaStreams instance, and our custom metric which is derived from some logic > in a KafkaStreams.StateListener class attached via the setStateListener > method. > > While I have provided a bit of context, before I reply with some reproducible > code - is there a simple way in which I can determine that my streams > application is in a RUNNING state without relying on the same mechanisms as > used above? > Further, given that it seems like my application is actually running - could > this perhaps be a bug to do with how the stream state is being reported (in > the context of a transactional stream using the processor API)? > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7657) Invalid reporting of stream state in Kafka streams application
[ https://issues.apache.org/jira/browse/KAFKA-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706877#comment-16706877 ] Patrik Kleindl commented on KAFKA-7657: --- [~guozhang] It seems we are seeing the same behaviour here in one of our environments. Version is 2.0.0-cp1, EOS is not enabled. Stream applications are running on two hosts each, on each of them I currently see one (but not the same) stream application reporting REBALANCING. I have suggested to the customer to open a Confluent support ticket. > Invalid reporting of stream state in Kafka streams application > -- > > Key: KAFKA-7657 > URL: https://issues.apache.org/jira/browse/KAFKA-7657 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.1 >Reporter: Thomas Crowley >Priority: Major > Labels: bug > > We have a streams application with 3 instances running, two of which are > reporting the state of REBALANCING even after they have been running for > days. Restarting the application has no effect on the stream state. > This seems suspect because each instance appears to be processing messages, > and the kafka-consumer-groups CLI tool reports hardly any offset lag in any > of the partitions assigned to the REBALANCING consumers. Each partition seems > to be processing an equal amount of records too. > Inspecting the state.dir on disk, it looks like the RocksDB state has been > built and hovers at the expected size on disk. > This problem has persisted for us after we rebuilt our Kafka cluster and > reset topics + consumer groups in our dev environment. > There is nothing in the logs (with level set to DEBUG) in both the broker or > the application that suggests something exceptional has happened causing the > application to be stuck REBALANCING. > We are also running multiple streaming applications where this problem does > not exist. > Two differences between this application and our other streaming applications > are: > * We have processing.guarantee set to exactly_once > * We are using a ValueTransformer which fetches from and puts data on a > windowed state store > The REBALANCING state is returned from both polling the state method of our > KafkaStreams instance, and our custom metric which is derived from some logic > in a KafkaStreams.StateListener class attached via the setStateListener > method. > > While I have provided a bit of context, before I reply with some reproducible > code - is there a simple way in which I can determine that my streams > application is in a RUNNING state without relying on the same mechanisms as > used above? > Further, given that it seems like my application is actually running - could > this perhaps be a bug to do with how the stream state is being reported (in > the context of a transactional stream using the processor API)? > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7657) Invalid reporting of stream state in Kafka streams application
[ https://issues.apache.org/jira/browse/KAFKA-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16695186#comment-16695186 ] Guozhang Wang commented on KAFKA-7657: -- Hi [~tscrowley] Thanks for reporting this issue. The described situation does sound like a bug to me. And since you are already running on a pretty new version (2.0.1) this bug may well be existing in trunk as well. At the moment if the Streams state is not reliable reporting the status of your application, I'd suggest using the consumer lag as a side-indicator of your application's "healthness". And to investigate this issue, could you share with me steps to reproduce this issue, and also try to turn off EOS and see if the issue goes away? I doubt the transformer has anything to do with the bug, and hence the one that would be my suspicion is EOS config. And if turning off EOS can resolve the issue, it can help us narrowing down the investigation scope. > Invalid reporting of stream state in Kafka streams application > -- > > Key: KAFKA-7657 > URL: https://issues.apache.org/jira/browse/KAFKA-7657 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.1 >Reporter: Thomas Crowley >Priority: Minor > > We have a streams application with 3 instances running, two of which are > reporting the state of REBALANCING even after they have been running for > days. Restarting the application has no effect on the stream state. > This seems suspect because each instance appears to be processing messages, > and the kafka-consumer-groups CLI tool reports hardly any offset lag in any > of the partitions assigned to the REBALANCING consumers. Each partition seems > to be processing an equal amount of records too. > Inspecting the state.dir on disk, it looks like the RocksDB state has been > built and hovers at the expected size on disk. > This problem has persisted for us after we rebuilt our Kafka cluster and > reset topics + consumer groups in our dev environment. > There is nothing in the logs (with level set to DEBUG) in both the broker or > the application that suggests something exceptional has happened causing the > application to be stuck REBALANCING. > We are also running multiple streaming applications where this problem does > not exist. > Two differences between this application and our other streaming applications > are: > * We have processing.guarantee set to exactly_once > * We are using a ValueTransformer which fetches from and puts data on a > windowed state store > The REBALANCING state is returned from both polling the state method of our > KafkaStreams instance, and our custom metric which is derived from some logic > in a KafkaStreams.StateListener class attached via the setStateListener > method. > > While I have provided a bit of context, before I reply with some reproducible > code - is there a simple way in which I can determine that my streams > application is in a RUNNING state without relying on the same mechanisms as > used above? > Further, given that it seems like my application is actually running - could > this perhaps be a bug to do with how the stream state is being reported (in > the context of a transactional stream using the processor API)? > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)