[jira] [Assigned] (KAFKA-16955) ConcurrentModification exception thrown by KafkaStream threadState access
[ https://issues.apache.org/jira/browse/KAFKA-16955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rohan Desai reassigned KAFKA-16955: --- Assignee: Rohan Desai > ConcurrentModification exception thrown by KafkaStream threadState access > - > > Key: KAFKA-16955 > URL: https://issues.apache.org/jira/browse/KAFKA-16955 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Rohan Desai >Assignee: Rohan Desai >Priority: Major > > We see occasional ConcurrentModificationExceptions thrown when accessing > threadState: > > > {code:java} > 155.745service_application1 info[ERROR] 2024-06-11 07:56:42.417 > [e2e-de9a85a4-b1d9-4807-bc34-e269f760bcd7-StreamThread-2] > ResponsiveKafkaStreams - stream-client > [e2e-de9a85a4-b1d9-4807-bc34-e269f760bcd7] Replacing thread in the streams > uncaught exception handler155.745service_application1 > infoorg.apache.kafka.streams.errors.StreamsException: > java.util.ConcurrentModificationException155.745service_application1 info > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:729) > ~[kafka-streams-3.7.0.jar:?]155.745service_application1 infoat > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645) > [kafka-streams-3.7.0.jar:?]155.745service_application1 infoCaused by: > java.util.ConcurrentModificationException155.745service_application1 info at > java.util.HashMap$HashIterator.nextNode(HashMap.java:1605) > ~[?:?]155.745service_application1 infoat > java.util.HashMap$ValueIterator.next(HashMap.java:1633) > ~[?:?]155.745service_application1 info at > org.apache.kafka.streams.KafkaStreams$StreamStateListener.maybeSetRunning(KafkaStreams.java:656) > ~[kafka-streams-3.7.0.jar:?]155.745service_application1 infoat > org.apache.kafka.streams.KafkaStreams$StreamStateListener.onChange(KafkaStreams.java:686) > ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info at > org.apache.kafka.streams.processor.internals.StreamThread.setState(StreamThread.java:261) > ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info at > org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:1120) > ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info at > org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:921) > ~[kafka-streams-3.7.0.jar:?]155.745service_application1 infoat > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686) > ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info... 1 > more {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16955) ConcurrentModification exception thrown by KafkaStream threadState access
[ https://issues.apache.org/jira/browse/KAFKA-16955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rohan Desai updated KAFKA-16955: Affects Version/s: 3.7.0 > ConcurrentModification exception thrown by KafkaStream threadState access > - > > Key: KAFKA-16955 > URL: https://issues.apache.org/jira/browse/KAFKA-16955 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Rohan Desai >Priority: Major > > We see occasional ConcurrentModificationExceptions thrown when accessing > threadState: > > > {code:java} > 155.745service_application1 info[ERROR] 2024-06-11 07:56:42.417 > [e2e-de9a85a4-b1d9-4807-bc34-e269f760bcd7-StreamThread-2] > ResponsiveKafkaStreams - stream-client > [e2e-de9a85a4-b1d9-4807-bc34-e269f760bcd7] Replacing thread in the streams > uncaught exception handler155.745service_application1 > infoorg.apache.kafka.streams.errors.StreamsException: > java.util.ConcurrentModificationException155.745service_application1 info > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:729) > ~[kafka-streams-3.7.0.jar:?]155.745service_application1 infoat > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645) > [kafka-streams-3.7.0.jar:?]155.745service_application1 infoCaused by: > java.util.ConcurrentModificationException155.745service_application1 info at > java.util.HashMap$HashIterator.nextNode(HashMap.java:1605) > ~[?:?]155.745service_application1 infoat > java.util.HashMap$ValueIterator.next(HashMap.java:1633) > ~[?:?]155.745service_application1 info at > org.apache.kafka.streams.KafkaStreams$StreamStateListener.maybeSetRunning(KafkaStreams.java:656) > ~[kafka-streams-3.7.0.jar:?]155.745service_application1 infoat > org.apache.kafka.streams.KafkaStreams$StreamStateListener.onChange(KafkaStreams.java:686) > ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info at > org.apache.kafka.streams.processor.internals.StreamThread.setState(StreamThread.java:261) > ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info at > org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:1120) > ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info at > org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:921) > ~[kafka-streams-3.7.0.jar:?]155.745service_application1 infoat > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686) > ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info... 1 > more {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16955) ConcurrentModification exception thrown by KafkaStream threadState access
[ https://issues.apache.org/jira/browse/KAFKA-16955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rohan Desai updated KAFKA-16955: Component/s: streams > ConcurrentModification exception thrown by KafkaStream threadState access > - > > Key: KAFKA-16955 > URL: https://issues.apache.org/jira/browse/KAFKA-16955 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Rohan Desai >Priority: Major > > We see occasional ConcurrentModificationExceptions thrown when accessing > threadState: > > > {code:java} > 155.745service_application1 info[ERROR] 2024-06-11 07:56:42.417 > [e2e-de9a85a4-b1d9-4807-bc34-e269f760bcd7-StreamThread-2] > ResponsiveKafkaStreams - stream-client > [e2e-de9a85a4-b1d9-4807-bc34-e269f760bcd7] Replacing thread in the streams > uncaught exception handler155.745service_application1 > infoorg.apache.kafka.streams.errors.StreamsException: > java.util.ConcurrentModificationException155.745service_application1 info > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:729) > ~[kafka-streams-3.7.0.jar:?]155.745service_application1 infoat > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645) > [kafka-streams-3.7.0.jar:?]155.745service_application1 infoCaused by: > java.util.ConcurrentModificationException155.745service_application1 info at > java.util.HashMap$HashIterator.nextNode(HashMap.java:1605) > ~[?:?]155.745service_application1 infoat > java.util.HashMap$ValueIterator.next(HashMap.java:1633) > ~[?:?]155.745service_application1 info at > org.apache.kafka.streams.KafkaStreams$StreamStateListener.maybeSetRunning(KafkaStreams.java:656) > ~[kafka-streams-3.7.0.jar:?]155.745service_application1 infoat > org.apache.kafka.streams.KafkaStreams$StreamStateListener.onChange(KafkaStreams.java:686) > ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info at > org.apache.kafka.streams.processor.internals.StreamThread.setState(StreamThread.java:261) > ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info at > org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:1120) > ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info at > org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:921) > ~[kafka-streams-3.7.0.jar:?]155.745service_application1 infoat > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686) > ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info... 1 > more {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16955) ConcurrentModification exception thrown by KafkaStream threadState access
Rohan Desai created KAFKA-16955: --- Summary: ConcurrentModification exception thrown by KafkaStream threadState access Key: KAFKA-16955 URL: https://issues.apache.org/jira/browse/KAFKA-16955 Project: Kafka Issue Type: Bug Reporter: Rohan Desai We see occasional ConcurrentModificationExceptions thrown when accessing threadState: {code:java} 155.745service_application1 info[ERROR] 2024-06-11 07:56:42.417 [e2e-de9a85a4-b1d9-4807-bc34-e269f760bcd7-StreamThread-2] ResponsiveKafkaStreams - stream-client [e2e-de9a85a4-b1d9-4807-bc34-e269f760bcd7] Replacing thread in the streams uncaught exception handler155.745service_application1 infoorg.apache.kafka.streams.errors.StreamsException: java.util.ConcurrentModificationException155.745service_application1 info at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:729) ~[kafka-streams-3.7.0.jar:?]155.745service_application1 infoat org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645) [kafka-streams-3.7.0.jar:?]155.745service_application1 infoCaused by: java.util.ConcurrentModificationException155.745service_application1 info at java.util.HashMap$HashIterator.nextNode(HashMap.java:1605) ~[?:?]155.745service_application1 infoat java.util.HashMap$ValueIterator.next(HashMap.java:1633) ~[?:?]155.745service_application1 info at org.apache.kafka.streams.KafkaStreams$StreamStateListener.maybeSetRunning(KafkaStreams.java:656) ~[kafka-streams-3.7.0.jar:?]155.745service_application1 infoat org.apache.kafka.streams.KafkaStreams$StreamStateListener.onChange(KafkaStreams.java:686) ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info at org.apache.kafka.streams.processor.internals.StreamThread.setState(StreamThread.java:261) ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:1120) ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info at org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:921) ~[kafka-streams-3.7.0.jar:?]155.745service_application1 infoat org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686) ~[kafka-streams-3.7.0.jar:?]155.745service_application1 info... 1 more {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16876) TaskManager.handleRevocation doesn't handle errors thrown from task.prepareCommit
[ https://issues.apache.org/jira/browse/KAFKA-16876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851499#comment-17851499 ] Rohan Desai commented on KAFKA-16876: - [~ganesh_6] I don't have that stack trace unfortunately, but I'm pretty certain that the error is being thrown by `ProcessorStateManager#flushCache`. The problem is that `ProcessorStateManager#flushCache` throws the exception that was thrown inside the Producer's io thread and returned in the future, instead of re-wrapping it, so the stack trace is the Producer thread stack trace instead of the stream thread stack trace (which is where `flushCache` is called from). > TaskManager.handleRevocation doesn't handle errors thrown from > task.prepareCommit > - > > Key: KAFKA-16876 > URL: https://issues.apache.org/jira/browse/KAFKA-16876 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.0 >Reporter: Rohan Desai >Assignee: Ganesh Sadanala >Priority: Minor > > `TaskManager.handleRevocation` does not handle exceptions thrown by > `task.prepareCommit`. In the particular instance I observed, `pepareCommit` > flushed caches which led to downstream `producer.send` calls that threw a > `TaskMigratedException`. This means that the tasks that need to be revoked > are not suspended by `handleRevocation`. `ConsumerCoordinator` stores the > thrown exception and then moves on to the other task assignment callbacks. > One of these - `StreamsPartitionAssigner.onCommit` tries to close the tasks > and raises an `IllegalStateException`. Fortunately, it dirty-closes the tasks > if close fails so we don't leak any tasks. I think there's maybe two bugs > here: > # `TaskManager.handleRevocation` should handle errors from `prepareCommit`. > It should try not to leave any revoked tasks in an unsuspended state. > # The `ConsumerCoordinator` just throws the first exception that it sees. > But it seems bad to throw the `TaskMigratedException` and drop the > `IllegalStateException` (though in this case I think its relatively benign). > I think on `IllegalStateException` we really want the streams thread to exit. > One idea here is to have `ConsumerCoordinator` throw an exception type that > includes the other exceptions that it has seen in another field. But this > breaks the contract for clients that catch specific exceptions. I'm not sure > of a clean solution, but I think its at least worth recording that it would > be preferable to have the caller of `poll` handle all the thrown exceptions > rather than just the first one. > > Here is the IllegalStateException stack trace I observed: > {code:java} > [ 508.535] [service_application2] [inf] [ERROR] 2024-05-30 06:35:04.556 > [e2e-c0a9810b-8b09-46bd-a6d0-f2678ce0a1f3-StreamThread-1] TaskManager - > stream-thread [e2e-c0a9810b-8b09-46bd-a6d0-f2678ce0a1f3-St > reamThread-1] Failed to close task 0_3 cleanly. Attempting to close remaining > tasks before re-throwing: > [ 508.535] [service_application2] [inf] > java.lang.IllegalStateException: Illegal state RUNNING while closing active > task 0_3 > [ 508.535] [service_application2] [inf] at > org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:673) > ~[kafka-streams-3.6.0.jar:?] > [ 508.535] [service_application2] [inf] at > org.apache.kafka.streams.processor.internals.StreamTask.closeClean(StreamTask.java:546) > ~[kafka-streams-3.6.0.jar:?] > [ 508.535] [service_application2] [inf] at > org.apache.kafka.streams.processor.internals.TaskManager.closeTaskClean(TaskManager.java:1295) > ~[kafka-streams-3.6.0.jar:?] > [ 508.535] [service_application2] [inf] at > org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:630) > [kafka-streams-3.6.0.jar:?] > [ 508.535] [service_application2] [inf] at > org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350) > [kafka-streams-3.6.0.jar:?] > [ 508.535] [service_application2] [inf] at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381) > [kafka-streams-3.6.0.jar:?] > [ 508.535] [service_application2] [inf] at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315) > [kafka-clients-3.6.0.jar:?] > [ 508.535] [service_application2] [inf] at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469) > [kafka-clients-3.6.0.jar:?] > [ 508.535] [service_application2] [inf] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(Abstrac
[jira] [Created] (KAFKA-16876) TaskManager.handleRevocation doesn't handle errors thrown from task.prepareCommit
Rohan Desai created KAFKA-16876: --- Summary: TaskManager.handleRevocation doesn't handle errors thrown from task.prepareCommit Key: KAFKA-16876 URL: https://issues.apache.org/jira/browse/KAFKA-16876 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.6.0 Reporter: Rohan Desai `TaskManager.handleRevocation` does not handle exceptions thrown by `task.prepareCommit`. In the particular instance I observed, `pepareCommit` flushed caches which led to downstream `producer.send` calls that threw a `TaskMigratedException`. This means that the tasks that need to be revoked are not suspended by `handleRevocation`. `ConsumerCoordinator` stores the thrown exception and then moves on to the other task assignment callbacks. One of these - `StreamsPartitionAssigner.onCommit` tries to close the tasks and raises an `IllegalStateException`. Fortunately, it dirty-closes the tasks if close fails so we don't leak any tasks. I think there's maybe two bugs here: # `TaskManager.handleRevocation` should handle errors from `prepareCommit`. It should try not to leave any revoked tasks in an unsuspended state. # The `ConsumerCoordinator` just throws the first exception that it sees. But it seems bad to throw the `TaskMigratedException` and drop the `IllegalStateException` (though in this case I think its relatively benign). I think on `IllegalStateException` we really want the streams thread to exit. One idea here is to have `ConsumerCoordinator` throw an exception type that includes the other exceptions that it has seen in another field. But this breaks the contract for clients that catch specific exceptions. I'm not sure of a clean solution, but I think its at least worth recording that it would be preferable to have the caller of `poll` handle all the thrown exceptions rather than just the first one. Here is the IllegalStateException stack trace I observed: {code:java} [ 508.535] [service_application2] [inf] [ERROR] 2024-05-30 06:35:04.556 [e2e-c0a9810b-8b09-46bd-a6d0-f2678ce0a1f3-StreamThread-1] TaskManager - stream-thread [e2e-c0a9810b-8b09-46bd-a6d0-f2678ce0a1f3-St reamThread-1] Failed to close task 0_3 cleanly. Attempting to close remaining tasks before re-throwing: [ 508.535] [service_application2] [inf] java.lang.IllegalStateException: Illegal state RUNNING while closing active task 0_3 [ 508.535] [service_application2] [inf] at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:673) ~[kafka-streams-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.streams.processor.internals.StreamTask.closeClean(StreamTask.java:546) ~[kafka-streams-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.streams.processor.internals.TaskManager.closeTaskClean(TaskManager.java:1295) ~[kafka-streams-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:630) [kafka-streams-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350) [kafka-streams-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381) [kafka-streams-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315) [kafka-clients-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469) [kafka-clients-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478) [kafka-clients-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389) [kafka-clients-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:564) [kafka-clients-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1220) [kafka-clients-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179) [kafka-clients-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159) [kafka-clients-3.6.0.jar:?] [
[jira] [Commented] (KAFKA-16811) Punctuate Ratio metric almost impossible to track
[ https://issues.apache.org/jira/browse/KAFKA-16811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17848797#comment-17848797 ] Rohan Desai commented on KAFKA-16811: - Alternatively, or perhaps in addition to a pre-windowed value, we can add a metric that measures the total time spent in punctuate since the StreamThread was created. Then, users can compute the time spent in the window of their choice downstream of the application by taking the value of the metric at the beginning of the window and subtracting that from the value of the metric at the end of the window. > Punctuate Ratio metric almost impossible to track > - > > Key: KAFKA-16811 > URL: https://issues.apache.org/jira/browse/KAFKA-16811 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Sebastien Viale >Priority: Minor > > The Punctuate ratio metric is returned after the last record of the poll > loop. It is recomputed in every poll loop. > After a puntuate, the value is close to 1, but there is little chance that > metric is sampled at this time. > So its value is almost always 0. > A solution could be to apply a kind of "sliding window" to it and report the > value for the last x seconds. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-13692) stream thread blocked-time-ns-total metric does not include producer metadata wait time
Rohan Desai created KAFKA-13692: --- Summary: stream thread blocked-time-ns-total metric does not include producer metadata wait time Key: KAFKA-13692 URL: https://issues.apache.org/jira/browse/KAFKA-13692 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.2.0 Reporter: Rohan Desai Fix For: 3.3.0 The stream thread blocked-time-ns-total metric does not include producer metadata wait time (time spent in `KafkaProducer.waitOnMetadata`). This can contribute significantly to actual total blocked time in some cases. For example, if a user deletes the streams sink topic, producers will wait until the max block timeout. This time does not get included in total blocked time when it should. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13229) KIP-761: implement a total blocked time metric in Kafka Streams
Rohan Desai created KAFKA-13229: --- Summary: KIP-761: implement a total blocked time metric in Kafka Streams Key: KAFKA-13229 URL: https://issues.apache.org/jira/browse/KAFKA-13229 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 3.1.0 Reporter: Rohan Desai Fix For: 3.1.0 KIP-761 proposes a total blocked time metric in streams that measures the total time (since the thread was started) that a given thread is blocked on Kafka. KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-761%3A+Add+Total+Blocked+Time+Metric+to+Streams -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12707) KafkaProducer should have a clearer error message on sasl.mechanism misconfiguration
Rohan Desai created KAFKA-12707: --- Summary: KafkaProducer should have a clearer error message on sasl.mechanism misconfiguration Key: KAFKA-12707 URL: https://issues.apache.org/jira/browse/KAFKA-12707 Project: Kafka Issue Type: Bug Affects Versions: 3.0.0 Reporter: Rohan Desai Not sure if this is producer-specific, but I accidentally configured my producer by setting: ``` sasl.mechanism=plain ``` instead of ``` sasl.mechanism=PLAIN ``` When I did this, the producer just hangs and logs in a loop like this, which isn't very informative: [2021-04-21 21:33:20,519] WARN [Producer clientId=producer-1] Bootstrap broker pkc-g91q1.us-west-2.aws.stag.cpdev.cloud:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient:1050) [2021-04-21 21:33:21,584] INFO [Producer clientId=producer-1] Failed to create channel due to (org.apache.kafka.common.network.SaslChannelBuilder:239) org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to create SaslClient with mechanism plain [2021-04-21 21:33:21,584] WARN [Producer clientId=producer-1] Error connecting to node pkc-g91q1.us-west-2.aws.stag.cpdev.cloud:9092 (id: -1 rack: null) (org.apache.kafka.clients.NetworkClient:982) java.io.IOException: Channel could not be created for socket java.nio.channels.SocketChannel[closed] at org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:348) at org.apache.kafka.common.network.Selector.registerChannel(Selector.java:329) at org.apache.kafka.common.network.Selector.connect(Selector.java:256) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:977) at org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:73) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1148) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1036) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:328) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator at org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:240) at org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:338) ... 10 more Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to create SaslClient with mechanism plain [2021-04-21 21:33:21,584] WARN [Producer clientId=producer-1] Bootstrap broker pkc-g91q1.us-west-2.aws.stag.cpdev.cloud:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient:1050) It would be better to early-exit with a clear error message -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10585) Kafka Streams should clean up the state store directory from cleanup
[ https://issues.apache.org/jira/browse/KAFKA-10585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17210308#comment-17210308 ] Rohan Desai commented on KAFKA-10585: - Yes, this is a good summary. > Kafka Streams should clean up the state store directory from cleanup > > > Key: KAFKA-10585 > URL: https://issues.apache.org/jira/browse/KAFKA-10585 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Rohan Desai >Priority: Major > > Currently, `KafkaStreams.cleanup` cleans up all the task-level directories > and the global directory. However it doesn't clean up the enclosing state > store directory, though streams does create this directory when it > initializes the state for the streams app. Feels like it should remove this > directory when it cleans up. > We notice this in ksql quite often, since every new query is a new streams > app. Over time, we see lots of state store directories left around for old > queries. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10585) Kafka Streams should clean up the state store directory from cleanup
Rohan Desai created KAFKA-10585: --- Summary: Kafka Streams should clean up the state store directory from cleanup Key: KAFKA-10585 URL: https://issues.apache.org/jira/browse/KAFKA-10585 Project: Kafka Issue Type: Bug Components: streams Reporter: Rohan Desai Currently, `KafkaStreams.cleanup` cleans up all the task-level directories and the global directory. However it doesn't clean up the enclosing state store directory, though streams does create this directory when it initializes the state for the streams app. Feels like it should remove this directory when it cleans up. We notice this in ksql quite often, since every new query is a new streams app. Over time, we see lots of state store directories left around for old queries. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10396) Overall memory of container keep on growing due to kafka stream / rocksdb and OOM killed once limit reached
[ https://issues.apache.org/jira/browse/KAFKA-10396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17180897#comment-17180897 ] Rohan Desai commented on KAFKA-10396: - > I mean to say if my topic going to have 10 Million record then what will be >the value of Cache and writeBufferManager? and if topic going to have just 5k >records then what will be the value of Cache and writeBufferManager? You can try to size your cache/wbm according to the available resources. So if you have XGB of memory on your system, choose some value significantly less than of that and allocate that to cache/wbm to bound memory usage from rocksdb. Remember, even if you choose a cache size that's smaller than it could be, the os will still cache data for you (it's just a bit more expensive to access it). So err on the side of choosing something smaller. FWIW we use ~20% of total memory for the cache (so 6GB out of 30). Then that, plus the size of your java heap should give you the total memory usage from Java and RocksDB. > Overall memory of container keep on growing due to kafka stream / rocksdb and > OOM killed once limit reached > --- > > Key: KAFKA-10396 > URL: https://issues.apache.org/jira/browse/KAFKA-10396 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.1, 2.5.0 >Reporter: Vagesh Mathapati >Assignee: Rohan Desai >Priority: Critical > Attachments: CustomRocksDBConfig.java, MyStreamProcessor.java, > kafkaStreamConfig.java > > > We are observing that overall memory of our container keep on growing and > never came down. > After analysis find out that rocksdbjni.so is keep on allocating 64M chunks > of memory off-heap and never releases back. This causes OOM kill after memory > reaches configured limit. > We use Kafka stream and globalktable for our many kafka topics. > Below is our environment > * Kubernetes cluster > * openjdk 11.0.7 2020-04-14 LTS > * OpenJDK Runtime Environment Zulu11.39+16-SA (build 11.0.7+10-LTS) > * OpenJDK 64-Bit Server VM Zulu11.39+16-SA (build 11.0.7+10-LTS, mixed mode) > * Springboot 2.3 > * spring-kafka-2.5.0 > * kafka-streams-2.5.0 > * kafka-streams-avro-serde-5.4.0 > * rocksdbjni-5.18.3 > Observed same result with kafka 2.3 version. > Below is the snippet of our analysis > from pmap output we took addresses from these 64M allocations (RSS) > Address Kbytes RSS Dirty Mode Mapping > 7f3ce800 65536 65532 65532 rw--- [ anon ] > 7f3cf400 65536 65536 65536 rw--- [ anon ] > 7f3d6400 65536 65536 65536 rw--- [ anon ] > We tried to match with memory allocation logs enabled with the help of Azul > systems team. > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0x261)[0x7f3e1c65d741] > - 0x7f3ce8ff7ca0 > @ /tmp/librocksdbjni6564497922441568920.so: > _ZN7rocksdb15BlockBasedTable3GetERKNS_11ReadOptionsERKNS_5SliceEPNS_10GetContextEPKNS_14SliceTransformEb+0x894)[0x7f3e1c898fd4] > - 0x7f3ce8ff9780 > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0xfa)[0x7f3e1c65d5da] > - 0x7f3ce8ff9750 > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0x261)[0x7f3e1c65d741] > - 0x7f3ce8ff97c0 > @ > /tmp/librocksdbjni6564497922441568920.so:_Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0xfa)[0x7f3e1c65d5da] > - 0x7f3ce8ffccf0 > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0x261)[0x7f3e1c65d741] > - 0x7f3ce8ffcd10 > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0xfa)[0x7f3e1c65d5da] > - 0x7f3ce8ffccf0 > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0x261)[0x7f3e1c65d741] > - 0x7f3ce8ffcd10 > We also identified that content on this 64M is just 0s and no any data > present in it. > I tried to tune the rocksDB configuratino as mentioned but it did not helped. > [https://docs.confluent.io/current/streams/developer-guide/config-streams.html#streams-developer-guide-rocksdb-config] > > Please let me know if you need any more details -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10396) Overall memory of container keep on growing due to kafka stream / rocksdb and OOM killed once limit reached
[ https://issues.apache.org/jira/browse/KAFKA-10396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17178355#comment-17178355 ] Rohan Desai commented on KAFKA-10396: - > I am not able to come up with optimized number. Generally what numbers we >should keep ? I'm not sure what you mean by an optimized number. Can you elaborate? > So the configuration of rocksDB which i am setting is per ktable/kstream >right? Looking at your config-setter, it looks like you're passing the same cache and write-buffer-manager to all callers, so the bound should apply across all streams/tables. Also, make sure you fix the leak in `getDefaultDataFromStore`. You need to close `storeIterator`, otherwise the iterator in rocksdb will not be destroyed and if it has blocks pinned then it's likely your configured bounds will be violated. > Overall memory of container keep on growing due to kafka stream / rocksdb and > OOM killed once limit reached > --- > > Key: KAFKA-10396 > URL: https://issues.apache.org/jira/browse/KAFKA-10396 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.1, 2.5.0 >Reporter: Vagesh Mathapati >Priority: Critical > Attachments: CustomRocksDBConfig.java, MyStreamProcessor.java, > kafkaStreamConfig.java > > > We are observing that overall memory of our container keep on growing and > never came down. > After analysis find out that rocksdbjni.so is keep on allocating 64M chunks > of memory off-heap and never releases back. This causes OOM kill after memory > reaches configured limit. > We use Kafka stream and globalktable for our many kafka topics. > Below is our environment > * Kubernetes cluster > * openjdk 11.0.7 2020-04-14 LTS > * OpenJDK Runtime Environment Zulu11.39+16-SA (build 11.0.7+10-LTS) > * OpenJDK 64-Bit Server VM Zulu11.39+16-SA (build 11.0.7+10-LTS, mixed mode) > * Springboot 2.3 > * spring-kafka-2.5.0 > * kafka-streams-2.5.0 > * kafka-streams-avro-serde-5.4.0 > * rocksdbjni-5.18.3 > Observed same result with kafka 2.3 version. > Below is the snippet of our analysis > from pmap output we took addresses from these 64M allocations (RSS) > Address Kbytes RSS Dirty Mode Mapping > 7f3ce800 65536 65532 65532 rw--- [ anon ] > 7f3cf400 65536 65536 65536 rw--- [ anon ] > 7f3d6400 65536 65536 65536 rw--- [ anon ] > We tried to match with memory allocation logs enabled with the help of Azul > systems team. > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0x261)[0x7f3e1c65d741] > - 0x7f3ce8ff7ca0 > @ /tmp/librocksdbjni6564497922441568920.so: > _ZN7rocksdb15BlockBasedTable3GetERKNS_11ReadOptionsERKNS_5SliceEPNS_10GetContextEPKNS_14SliceTransformEb+0x894)[0x7f3e1c898fd4] > - 0x7f3ce8ff9780 > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0xfa)[0x7f3e1c65d5da] > - 0x7f3ce8ff9750 > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0x261)[0x7f3e1c65d741] > - 0x7f3ce8ff97c0 > @ > /tmp/librocksdbjni6564497922441568920.so:_Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0xfa)[0x7f3e1c65d5da] > - 0x7f3ce8ffccf0 > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0x261)[0x7f3e1c65d741] > - 0x7f3ce8ffcd10 > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0xfa)[0x7f3e1c65d5da] > - 0x7f3ce8ffccf0 > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0x261)[0x7f3e1c65d741] > - 0x7f3ce8ffcd10 > We also identified that content on this 64M is just 0s and no any data > present in it. > I tried to tune the rocksDB configuratino as mentioned but it did not helped. > [https://docs.confluent.io/current/streams/developer-guide/config-streams.html#streams-developer-guide-rocksdb-config] > > Please let me know if you need any more details -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10396) Overall memory of container keep on growing due to kafka stream / rocksdb and OOM killed once limit reached
[ https://issues.apache.org/jira/browse/KAFKA-10396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17177610#comment-17177610 ] Rohan Desai commented on KAFKA-10396: - In addition to configuring the write buffer manager, make sure you close the iterator you open in {{getDefaultDataFromStore}} > Overall memory of container keep on growing due to kafka stream / rocksdb and > OOM killed once limit reached > --- > > Key: KAFKA-10396 > URL: https://issues.apache.org/jira/browse/KAFKA-10396 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.1, 2.5.0 >Reporter: Vagesh Mathapati >Priority: Critical > Attachments: MyStreamProcessor.java, kafkaStreamConfig.java > > > We are observing that overall memory of our container keep on growing and > never came down. > After analysis find out that rocksdbjni.so is keep on allocating 64M chunks > of memory off-heap and never releases back. This causes OOM kill after memory > reaches configured limit. > We use Kafka stream and globalktable for our many kafka topics. > Below is our environment > * Kubernetes cluster > * openjdk 11.0.7 2020-04-14 LTS > * OpenJDK Runtime Environment Zulu11.39+16-SA (build 11.0.7+10-LTS) > * OpenJDK 64-Bit Server VM Zulu11.39+16-SA (build 11.0.7+10-LTS, mixed mode) > * Springboot 2.3 > * spring-kafka-2.5.0 > * kafka-streams-2.5.0 > * kafka-streams-avro-serde-5.4.0 > * rocksdbjni-5.18.3 > Observed same result with kafka 2.3 version. > Below is the snippet of our analysis > from pmap output we took addresses from these 64M allocations (RSS) > Address Kbytes RSS Dirty Mode Mapping > 7f3ce800 65536 65532 65532 rw--- [ anon ] > 7f3cf400 65536 65536 65536 rw--- [ anon ] > 7f3d6400 65536 65536 65536 rw--- [ anon ] > We tried to match with memory allocation logs enabled with the help of Azul > systems team. > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0x261)[0x7f3e1c65d741] > - 0x7f3ce8ff7ca0 > @ /tmp/librocksdbjni6564497922441568920.so: > _ZN7rocksdb15BlockBasedTable3GetERKNS_11ReadOptionsERKNS_5SliceEPNS_10GetContextEPKNS_14SliceTransformEb+0x894)[0x7f3e1c898fd4] > - 0x7f3ce8ff9780 > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0xfa)[0x7f3e1c65d5da] > - 0x7f3ce8ff9750 > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0x261)[0x7f3e1c65d741] > - 0x7f3ce8ff97c0 > @ > /tmp/librocksdbjni6564497922441568920.so:_Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0xfa)[0x7f3e1c65d5da] > - 0x7f3ce8ffccf0 > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0x261)[0x7f3e1c65d741] > - 0x7f3ce8ffcd10 > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0xfa)[0x7f3e1c65d5da] > - 0x7f3ce8ffccf0 > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0x261)[0x7f3e1c65d741] > - 0x7f3ce8ffcd10 > We also identified that content on this 64M is just 0s and no any data > present in it. > I tried to tune the rocksDB configuratino as mentioned but it did not helped. > [https://docs.confluent.io/current/streams/developer-guide/config-streams.html#streams-developer-guide-rocksdb-config] > > Please let me know if you need any more details -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10396) Overall memory of container keep on growing due to kafka stream / rocksdb and OOM killed once limit reached
[ https://issues.apache.org/jira/browse/KAFKA-10396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17177577#comment-17177577 ] Rohan Desai commented on KAFKA-10396: - It may also be a different problem. Something else to try would be to configure the write buffer manager from your rocksdb config setter (I don't think the example in the streams doc does this). You could take a look at the ksqlDB config-setter as an example: [https://github.com/confluentinc/ksql/blob/master/ksqldb-rocksdb-config-setter/src/main/java/io/confluent/ksql/rocksdb/KsqlBoundedMemoryRocksDBConfigSetter.java] > Overall memory of container keep on growing due to kafka stream / rocksdb and > OOM killed once limit reached > --- > > Key: KAFKA-10396 > URL: https://issues.apache.org/jira/browse/KAFKA-10396 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.1, 2.5.0 >Reporter: Vagesh Mathapati >Priority: Critical > Attachments: MyStreamProcessor.java, kafkaStreamConfig.java > > > We are observing that overall memory of our container keep on growing and > never came down. > After analysis find out that rocksdbjni.so is keep on allocating 64M chunks > of memory off-heap and never releases back. This causes OOM kill after memory > reaches configured limit. > We use Kafka stream and globalktable for our many kafka topics. > Below is our environment > * Kubernetes cluster > * openjdk 11.0.7 2020-04-14 LTS > * OpenJDK Runtime Environment Zulu11.39+16-SA (build 11.0.7+10-LTS) > * OpenJDK 64-Bit Server VM Zulu11.39+16-SA (build 11.0.7+10-LTS, mixed mode) > * Springboot 2.3 > * spring-kafka-2.5.0 > * kafka-streams-2.5.0 > * kafka-streams-avro-serde-5.4.0 > * rocksdbjni-5.18.3 > Observed same result with kafka 2.3 version. > Below is the snippet of our analysis > from pmap output we took addresses from these 64M allocations (RSS) > Address Kbytes RSS Dirty Mode Mapping > 7f3ce800 65536 65532 65532 rw--- [ anon ] > 7f3cf400 65536 65536 65536 rw--- [ anon ] > 7f3d6400 65536 65536 65536 rw--- [ anon ] > We tried to match with memory allocation logs enabled with the help of Azul > systems team. > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0x261)[0x7f3e1c65d741] > - 0x7f3ce8ff7ca0 > @ /tmp/librocksdbjni6564497922441568920.so: > _ZN7rocksdb15BlockBasedTable3GetERKNS_11ReadOptionsERKNS_5SliceEPNS_10GetContextEPKNS_14SliceTransformEb+0x894)[0x7f3e1c898fd4] > - 0x7f3ce8ff9780 > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0xfa)[0x7f3e1c65d5da] > - 0x7f3ce8ff9750 > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0x261)[0x7f3e1c65d741] > - 0x7f3ce8ff97c0 > @ > /tmp/librocksdbjni6564497922441568920.so:_Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0xfa)[0x7f3e1c65d5da] > - 0x7f3ce8ffccf0 > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0x261)[0x7f3e1c65d741] > - 0x7f3ce8ffcd10 > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0xfa)[0x7f3e1c65d5da] > - 0x7f3ce8ffccf0 > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0x261)[0x7f3e1c65d741] > - 0x7f3ce8ffcd10 > We also identified that content on this 64M is just 0s and no any data > present in it. > I tried to tune the rocksDB configuratino as mentioned but it did not helped. > [https://docs.confluent.io/current/streams/developer-guide/config-streams.html#streams-developer-guide-rocksdb-config] > > Please let me know if you need any more details -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10396) Overall memory of container keep on growing due to kafka stream / rocksdb and OOM killed once limit reached
[ https://issues.apache.org/jira/browse/KAFKA-10396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17177574#comment-17177574 ] Rohan Desai commented on KAFKA-10396: - [~vmathapati] it should show up on your pmap. For example: {code:java} sudo pmap -X 13799 13799: ./a.out Address Perm Offset Device Inode Size Rss Pss Referenced Anonymous ShmemPmdMapped Shared_Hugetlb Private_Hugetlb Swap SwapPss Locked ProtectionKey Mapping ... 7fa5c36da000 r--p 103:02 274900 28 28 28 28 0 0 0 00 0 0 0 libjemalloc.so.2 7fa5c36e1000 r-xp 7000 103:02 274900 608 396 396 396 0 0 0 00 0 0 0 libjemalloc.so.2 7fa5c3779000 r--p 0009f000 103:02 274900 64 64 64 64 0 0 0 00 0 0 0 libjemalloc.so.2 7fa5c3789000 ---p 000af000 103:02 2749004 0 0 0 0 0 0 00 0 0 0 libjemalloc.so.2 7fa5c378a000 r--p 000af000 103:02 274900 20 20 20 2016 0 0 00 0 0 0 libjemalloc.so.2 7fa5c378f000 rw-p 000b4000 103:02 2749004 4 4 4 4 0 0 00 0 0 0 libjemalloc.so.2 ...{code} > Overall memory of container keep on growing due to kafka stream / rocksdb and > OOM killed once limit reached > --- > > Key: KAFKA-10396 > URL: https://issues.apache.org/jira/browse/KAFKA-10396 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.1, 2.5.0 >Reporter: Vagesh Mathapati >Priority: Critical > Attachments: MyStreamProcessor.java, kafkaStreamConfig.java > > > We are observing that overall memory of our container keep on growing and > never came down. > After analysis find out that rocksdbjni.so is keep on allocating 64M chunks > of memory off-heap and never releases back. This causes OOM kill after memory > reaches configured limit. > We use Kafka stream and globalktable for our many kafka topics. > Below is our environment > * Kubernetes cluster > * openjdk 11.0.7 2020-04-14 LTS > * OpenJDK Runtime Environment Zulu11.39+16-SA (build 11.0.7+10-LTS) > * OpenJDK 64-Bit Server VM Zulu11.39+16-SA (build 11.0.7+10-LTS, mixed mode) > * Springboot 2.3 > * spring-kafka-2.5.0 > * kafka-streams-2.5.0 > * kafka-streams-avro-serde-5.4.0 > * rocksdbjni-5.18.3 > Observed same result with kafka 2.3 version. > Below is the snippet of our analysis > from pmap output we took addresses from these 64M allocations (RSS) > Address Kbytes RSS Dirty Mode Mapping > 7f3ce800 65536 65532 65532 rw--- [ anon ] > 7f3cf400 65536 65536 65536 rw--- [ anon ] > 7f3d6400 65536 65536 65536 rw--- [ anon ] > We tried to match with memory allocation logs enabled with the help of Azul > systems team. > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0x261)[0x7f3e1c65d741] > - 0x7f3ce8ff7ca0 > @ /tmp/librocksdbjni6564497922441568920.so: > _ZN7rocksdb15BlockBasedTable3GetERKNS_11ReadOptionsERKNS_5SliceEPNS_10GetContextEPKNS_14SliceTransformEb+0x894)[0x7f3e1c898fd4] > - 0x7f3ce8ff9780 > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0xfa)[0x7f3e1c65d5da] > - 0x7f3ce8ff9750 > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0x261)[0x7f3e1c65d741] > - 0x7f3ce8ff97c0 > @ > /tmp/librocksdbjni6564497922441568920.so:_Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0xfa)[0x7f3e1c65d5da] > - 0x7f3ce8ffccf0 > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0x261)[0x7f3e1c65d741] > - 0x7f3ce8ffcd10 > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0xfa)[0x7f3e1c65d5da] > - 0x7f3ce8ffccf0 > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFa
[jira] [Commented] (KAFKA-10396) Overall memory of container keep on growing due to kafka stream / rocksdb and OOM killed once limit reached
[ https://issues.apache.org/jira/browse/KAFKA-10396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17177530#comment-17177530 ] Rohan Desai commented on KAFKA-10396: - [~vmathapati] you'd have to install it, and then look for it in the lib directories. The exact steps depend on the system. For example, on debian/ubuntu you can run: {{apt install libjemalloc-dev}} Then, run {{sudo find /usr/lib/ -name libjemalloc.so}} to get the so path. > Overall memory of container keep on growing due to kafka stream / rocksdb and > OOM killed once limit reached > --- > > Key: KAFKA-10396 > URL: https://issues.apache.org/jira/browse/KAFKA-10396 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.1, 2.5.0 >Reporter: Vagesh Mathapati >Priority: Critical > Attachments: MyStreamProcessor.java, kafkaStreamConfig.java > > > We are observing that overall memory of our container keep on growing and > never came down. > After analysis find out that rocksdbjni.so is keep on allocating 64M chunks > of memory off-heap and never releases back. This causes OOM kill after memory > reaches configured limit. > We use Kafka stream and globalktable for our many kafka topics. > Below is our environment > * Kubernetes cluster > * openjdk 11.0.7 2020-04-14 LTS > * OpenJDK Runtime Environment Zulu11.39+16-SA (build 11.0.7+10-LTS) > * OpenJDK 64-Bit Server VM Zulu11.39+16-SA (build 11.0.7+10-LTS, mixed mode) > * Springboot 2.3 > * spring-kafka-2.5.0 > * kafka-streams-2.5.0 > * kafka-streams-avro-serde-5.4.0 > * rocksdbjni-5.18.3 > Observed same result with kafka 2.3 version. > Below is the snippet of our analysis > from pmap output we took addresses from these 64M allocations (RSS) > Address Kbytes RSS Dirty Mode Mapping > 7f3ce800 65536 65532 65532 rw--- [ anon ] > 7f3cf400 65536 65536 65536 rw--- [ anon ] > 7f3d6400 65536 65536 65536 rw--- [ anon ] > We tried to match with memory allocation logs enabled with the help of Azul > systems team. > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0x261)[0x7f3e1c65d741] > - 0x7f3ce8ff7ca0 > @ /tmp/librocksdbjni6564497922441568920.so: > _ZN7rocksdb15BlockBasedTable3GetERKNS_11ReadOptionsERKNS_5SliceEPNS_10GetContextEPKNS_14SliceTransformEb+0x894)[0x7f3e1c898fd4] > - 0x7f3ce8ff9780 > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0xfa)[0x7f3e1c65d5da] > - 0x7f3ce8ff9750 > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0x261)[0x7f3e1c65d741] > - 0x7f3ce8ff97c0 > @ > /tmp/librocksdbjni6564497922441568920.so:_Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0xfa)[0x7f3e1c65d5da] > - 0x7f3ce8ffccf0 > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0x261)[0x7f3e1c65d741] > - 0x7f3ce8ffcd10 > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0xfa)[0x7f3e1c65d5da] > - 0x7f3ce8ffccf0 > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0x261)[0x7f3e1c65d741] > - 0x7f3ce8ffcd10 > We also identified that content on this 64M is just 0s and no any data > present in it. > I tried to tune the rocksDB configuratino as mentioned but it did not helped. > [https://docs.confluent.io/current/streams/developer-guide/config-streams.html#streams-developer-guide-rocksdb-config] > > Please let me know if you need any more details -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10357) Handle accidental deletion of repartition-topics as exceptional failure
[ https://issues.apache.org/jira/browse/KAFKA-10357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17177425#comment-17177425 ] Rohan Desai commented on KAFKA-10357: - > So maybe we can consider just fixing KAFKA-3370 and resetting policy to >`none` would fix it, and we just need an elegant way to shutdown the whole >application and notify the user when this exception get thrown due to >re-creation of the repartition topics. WDYT? One issue here is that we're pushing the responsibility of handling this scenario without data loss into the application. Typically I'd expect most applications that see this error to exit - the app can no longer make progress. However most apps running in a production setting are wrapped in some sort of retry loop. For example, someone just using Streams might run their service under something like upstart and would typically configure it to just restart the process when it exits. Or maybe they are running in k8s which would start a new pod when a pod exits. In ksql we would just try to restart the query. Even if we included the smarts to detect this case and not restart, we'd need to persist this information somewhere so that we would know not to do this on a restart. It seems preferable to me to have streams be able to detect when its internal state is invalid. Requiring explicit initialization would be one way to do this. > a new Streams client could be started before the rebalance that should report >the error took place I would expect that a user would do this initialization as a manual step before starting their application. I think it's fine for there to be some initial configuration that's not done automatically by streams. > Handle accidental deletion of repartition-topics as exceptional failure > --- > > Key: KAFKA-10357 > URL: https://issues.apache.org/jira/browse/KAFKA-10357 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Bruno Cadonna >Priority: Major > > Repartition topics are both written by Stream's producer and read by Stream's > consumer, so when they are accidentally deleted both clients may be notified. > But in practice the consumer would react to it much quicker than producer > since the latter has a delivery timeout expiration period (see > https://issues.apache.org/jira/browse/KAFKA-10356). When consumer reacts to > it, it will re-join the group since metadata changed and during the triggered > rebalance it would auto-recreate the topic silently and continue, causing > data lost silently. > One idea, is to only create all repartition topics *once* in the first > rebalance and not auto-create them any more in future rebalances, instead it > would be treated similar as INCOMPLETE_SOURCE_TOPIC_METADATA error code > (https://issues.apache.org/jira/browse/KAFKA-10355). > The challenge part would be, how to determine if it is the first-ever > rebalance, and there are several wild ideas I'd like to throw out here: > 1) change the thread state transition diagram so that STARTING state would > not transit to PARTITION_REVOKED but only to PARTITION_ASSIGNED, then in the > assign function we can check if the state is still in CREATED and not RUNNING. > 2) augment the subscriptionInfo to encode whether or not this is the first > time ever rebalance. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10396) Overall memory of container keep on growing due to kafka stream / rocksdb and OOM killed once limit reached
[ https://issues.apache.org/jira/browse/KAFKA-10396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17177331#comment-17177331 ] Rohan Desai commented on KAFKA-10396: - [~vmathapati] we've hit a similar issue in ksqlDB, which runs streams underneath. We suspect it is due to fragmentation that manifests when running with the glibc allocator, and have found that running with jemalloc ([https://github.com/jemalloc/jemalloc]) avoids the issue. To see if this also helps you, you can try instaling jemalloc and then running your application with the following environment variable: {{LD_PRELOAD=}} If you could provide more details about how you build your image (or maybe share your dockerfile) I could help you get jemalloc installed. > Overall memory of container keep on growing due to kafka stream / rocksdb and > OOM killed once limit reached > --- > > Key: KAFKA-10396 > URL: https://issues.apache.org/jira/browse/KAFKA-10396 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.1, 2.5.0 >Reporter: Vagesh Mathapati >Priority: Critical > Attachments: MyStreamProcessor.java, kafkaStreamConfig.java > > > We are observing that overall memory of our container keep on growing and > never came down. > After analysis find out that rocksdbjni.so is keep on allocating 64M chunks > of memory off-heap and never releases back. This causes OOM kill after memory > reaches configured limit. > We use Kafka stream and globalktable for our many kafka topics. > Below is our environment > * Kubernetes cluster > * openjdk 11.0.7 2020-04-14 LTS > * OpenJDK Runtime Environment Zulu11.39+16-SA (build 11.0.7+10-LTS) > * OpenJDK 64-Bit Server VM Zulu11.39+16-SA (build 11.0.7+10-LTS, mixed mode) > * Springboot 2.3 > * spring-kafka-2.5.0 > * kafka-streams-2.5.0 > * kafka-streams-avro-serde-5.4.0 > * rocksdbjni-5.18.3 > Observed same result with kafka 2.3 version. > Below is the snippet of our analysis > from pmap output we took addresses from these 64M allocations (RSS) > Address Kbytes RSS Dirty Mode Mapping > 7f3ce800 65536 65532 65532 rw--- [ anon ] > 7f3cf400 65536 65536 65536 rw--- [ anon ] > 7f3d6400 65536 65536 65536 rw--- [ anon ] > We tried to match with memory allocation logs enabled with the help of Azul > systems team. > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0x261)[0x7f3e1c65d741] > - 0x7f3ce8ff7ca0 > @ /tmp/librocksdbjni6564497922441568920.so: > _ZN7rocksdb15BlockBasedTable3GetERKNS_11ReadOptionsERKNS_5SliceEPNS_10GetContextEPKNS_14SliceTransformEb+0x894)[0x7f3e1c898fd4] > - 0x7f3ce8ff9780 > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0xfa)[0x7f3e1c65d5da] > - 0x7f3ce8ff9750 > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0x261)[0x7f3e1c65d741] > - 0x7f3ce8ff97c0 > @ > /tmp/librocksdbjni6564497922441568920.so:_Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0xfa)[0x7f3e1c65d5da] > - 0x7f3ce8ffccf0 > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0x261)[0x7f3e1c65d741] > - 0x7f3ce8ffcd10 > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0xfa)[0x7f3e1c65d5da] > - 0x7f3ce8ffccf0 > @ /tmp/librocksdbjni6564497922441568920.so: > _Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0x261)[0x7f3e1c65d741] > - 0x7f3ce8ffcd10 > We also identified that content on this 64M is just 0s and no any data > present in it. > I tried to tune the rocksDB configuratino as mentioned but it did not helped. > [https://docs.confluent.io/current/streams/developer-guide/config-streams.html#streams-developer-guide-rocksdb-config] > > Please let me know if you need any more details -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
[ https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17140166#comment-17140166 ] Rohan Desai commented on KAFKA-10179: - Also, it's not really clear from the documentation that `serialize(deserialize())` is assumed to be the identity function for `ktable(..)`. > State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables > - > > Key: KAFKA-10179 > URL: https://issues.apache.org/jira/browse/KAFKA-10179 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Major > Fix For: 2.7.0 > > > {{MeteredKeyValueStore}} passes the name of the changelog topic of the state > store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for > optimized source tables the changelog topic is the source topic. > Most serdes do not use the topic name passed to them. However, if the serdes > actually use the topic name for (de)serialization, e.g., when Kafka Streams > is used with Confluent's Schema Registry, a > {{org.apache.kafka.common.errors.SerializationException}} is thrown. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
[ https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17140158#comment-17140158 ] Rohan Desai edited comment on KAFKA-10179 at 6/19/20, 3:19 AM: --- Deserialization may itself be a transformation. For example, suppose I have source data with 10 fields, but only care about 3 of them for my stream processing app. It seems that it would be reasonable to provide a deserializer that just extracts those 3 fields. I suppose you could express this as a projection after creating the table, but that does preclude optimizations that use selective deserialization. And it may be much more expensive to do the materialization (since you're potentially materializing lots of data unnecessarily). I think there should be some way to achieve each of the following: * optimized and the data in the store is exactly the same as the topic data . In this case (what's implemented today) the data can be restored by writing the source records into the store * optimized and the deserializer transforms the data somehow. In this case the data can be restored by deserializing/serializing each row from the source topic before writing it into the store. I don't think this is possible today. * not optimized (w/ which you could have a transforming deserializer and faster recovery, at the cost of extra data in kafka). I don't think this is possible today without turning all optimizations off. > This is a known issue and tracked via: >https://issues.apache.org/jira/browse/KAFKA-8037 ack - thanks! was (Author: desai.p.rohan): Deserialization may itself be a transformation. For example, suppose I have source data with 10 fields, but only care about 3 of them for my stream processing app. It seems that it would be reasonable to provide a deserializer that just extracts those 3 fields. I suppose you could express this as a projection after creating the table, but that does preclude optimizations that use selective deserialization. And it may be much more expensive to do the materialization (since you're potentially materializing lots of data unnecessarily). I think there should be some way to achieve each of the following: * optimized and the data in the store is exactly the same as the topic data . In this case (what's implemented today) the data can be restored by writing the source records into the store * optimized and the deserializer transforms the data somehow. In this case the data can be restored by deserializing/serializing each row from the source topic before writing it into the store. I don't think this is possible today. * not optimized (which would you have a transforming deserializer and faster recovery, at the cost of extra data in kafka). I don't think this is possible today without turning all optimizations off. > This is a known issue and tracked via: >https://issues.apache.org/jira/browse/KAFKA-8037 ack - thanks! > State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables > - > > Key: KAFKA-10179 > URL: https://issues.apache.org/jira/browse/KAFKA-10179 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Major > Fix For: 2.7.0 > > > {{MeteredKeyValueStore}} passes the name of the changelog topic of the state > store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for > optimized source tables the changelog topic is the source topic. > Most serdes do not use the topic name passed to them. However, if the serdes > actually use the topic name for (de)serialization, e.g., when Kafka Streams > is used with Confluent's Schema Registry, a > {{org.apache.kafka.common.errors.SerializationException}} is thrown. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
[ https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17140158#comment-17140158 ] Rohan Desai commented on KAFKA-10179: - Deserialization may itself be a transformation. For example, suppose I have source data with 10 fields, but only care about 3 of them for my stream processing app. It seems that it would be reasonable to provide a deserializer that just extracts those 3 fields. I suppose you could express this as a projection after creating the table, but that does preclude optimizations that use selective deserialization. And it may be much more expensive to do the materialization (since you're potentially materializing lots of data unnecessarily). I think there should be some way to achieve each of the following: * optimized and the data in the store is exactly the same as the topic data . In this case (what's implemented today) the data can be restored by writing the source records into the store * optimized and the deserializer transforms the data somehow. In this case the data can be restored by deserializing/serializing each row from the source topic before writing it into the store. I don't think this is possible today. * not optimized (which would you have a transforming deserializer and faster recovery, at the cost of extra data in kafka). I don't think this is possible today without turning all optimizations off. > This is a known issue and tracked via: >https://issues.apache.org/jira/browse/KAFKA-8037 ack - thanks! > State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables > - > > Key: KAFKA-10179 > URL: https://issues.apache.org/jira/browse/KAFKA-10179 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Major > Fix For: 2.7.0 > > > {{MeteredKeyValueStore}} passes the name of the changelog topic of the state > store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for > optimized source tables the changelog topic is the source topic. > Most serdes do not use the topic name passed to them. However, if the serdes > actually use the topic name for (de)serialization, e.g., when Kafka Streams > is used with Confluent's Schema Registry, a > {{org.apache.kafka.common.errors.SerializationException}} is thrown. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
[ https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17139926#comment-17139926 ] Rohan Desai commented on KAFKA-10179: - I'm not sure it's correct to use the same "topic" name for materializing optimized source tables, as it's logically different data. In the normal flow (not recovery), we're taking the topic data, validating/transforming it by deserializing it (which might apply some transforms like projecting just fields of interest), and then serializing it, and then writing it into the store. So the "topic" we pass to the serializer should be different since it represents different data from the source topic. This has consequences in practice when used with a schema registry using the confluent serializers. If we use the same topic, `serialize` might register a different schema with the source subject, which we probably don't want. I think the technically correct thing to do (though this is of course more expensive) would be (when the source table is optimized) to deserialize and serialize each record when restoring. Another issue that I think exists (need to try to reproduce) that deserializing/serializing would solve is skipped validation. The source topic deserializer functions as a sort of validator for records from the source topic. When the streams app is configured to skip on deserialization errors, bad source records are just skipped. However if we restore by just writing those records to the state store, we now hit the deserialization error when reading the state store, which is a query-killing error. > State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables > - > > Key: KAFKA-10179 > URL: https://issues.apache.org/jira/browse/KAFKA-10179 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Major > Fix For: 2.7.0 > > > {{MeteredKeyValueStore}} passes the name of the changelog topic of the state > store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for > optimized source tables the changelog topic is the source topic. > Most serdes do not use the topic name passed to them. However, if the serdes > actually use the topic name for (de)serialization, e.g., when Kafka Streams > is used with Confluent's Schema Registry, a > {{org.apache.kafka.common.errors.SerializationException}} is thrown. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9659) Kafka Streams / Consumer configured for static membership fails on "fatal exception: group.instance.id gets fenced"
[ https://issues.apache.org/jira/browse/KAFKA-9659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rohan Desai updated KAFKA-9659: --- Summary: Kafka Streams / Consumer configured for static membership fails on "fatal exception: group.instance.id gets fenced" (was: Kafka Streams / Consumer fails on "fatal exception: group.instance.id gets fenced") > Kafka Streams / Consumer configured for static membership fails on "fatal > exception: group.instance.id gets fenced" > --- > > Key: KAFKA-9659 > URL: https://issues.apache.org/jira/browse/KAFKA-9659 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.5.0 >Reporter: Rohan Desai >Priority: Major > Attachments: ksql-1.logs > > > I'm running a KSQL query, which underneath is built into a Kafka Streams > application. The application has been running without issue for a few days, > until today, when all the streams threads exited with: > > > {{[ERROR] 2020-03-05 00:57:58,776 > [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] > org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle - > [Consumer instanceId=ksql-1-2, > clientId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2-consumer, > groupId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5] > Received fatal exception: group.instance.id gets fenced}} > {{[ERROR] 2020-03-05 00:57:58,776 > [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] > org.apache.kafka.clients.consumer.internals.AbstractCoordinator onFailure - > [Consumer instanceId=ksql-1-2, > clientId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2-consumer, > groupId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5] > Caught fenced group.instance.id Optional[ksql-1-2] error in heartbeat thread}} > {{[ERROR] 2020-03-05 00:57:58,776 > [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] > org.apache.kafka.streams.processor.internals.StreamThread run - > stream-thread > [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] > Encountered the following unexpected Kafka exception during processing, this > usually indicate Streams internal errors:}} > \{{ org.apache.kafka.common.errors.FencedInstanceIdException: The broker > rejected this static consumer since another consumer with the same > group.instance.id has registered with a different member.id.}}{{[INFO] > 2020-03-05 00:57:58,776 > [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] > org.apache.kafka.streams.processor.internals.StreamThread setState - > stream-thread > [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] > State transition from RUNNING to PENDING_SHUTDOWN}} > > I've attached the KSQL and Kafka Streams logs to this ticket. Here's a > summary for one of the streams threads (instance id `ksql-1-2`): > > Around 00:56:36 the coordinator fails over from b11 to b2: > > {{[INFO] 2020-03-05 00:56:36,258 > [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2] > org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle - > [Consumer instanceId=ksql-1-2, > clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2-consumer, > groupId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0] Attempt to > heartbeat failed since coordinator > b11-pkc-lzxjz.us-west-2.aws.devel.cpdev.cloud:9092 (id: 2147483636 rack: > null) is either not started or not valid.}} > {{ [INFO] 2020-03-05 00:56:36,258 > [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2] > org.apache.kafka.clients.consumer.internals.AbstractCoordinator > markCoordinatorUnknown - [Consumer instanceId=ksql-1-2, > clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2-consumer, > groupId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0] Group > coordinator b11-pkc-lzxjz.us-west-2.aws.devel.cpdev.cloud:9092 (id: > 2147483636 rack: null) is unavailable or invalid, will attempt rediscovery}} > {{ [INFO] 2020-03-05 00:56:36,270 > [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_
[jira] [Commented] (KAFKA-9659) Kafka Streams / Consumer fails on "fatal exception: group.instance.id gets fenced"
[ https://issues.apache.org/jira/browse/KAFKA-9659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17051806#comment-17051806 ] Rohan Desai commented on KAFKA-9659: Some of the streams threads died because the heartbeat failed (e.g. thread 1 of this query): {{[INFO] 2020-03-05 00:56:36,339 [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle - [Consumer instanceId=ksql-1-1, clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-1-consumer, groupId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0] Attempt to heartbeat failed since coordinator b11-pkc-lzxjz.us-west-2.aws.devel.cpdev.cloud:9092 (id: 2147483636 rack: null) is either not started or not valid.}} {{[INFO] 2020-03-05 00:56:36,339 [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator markCoordinatorUnknown - [Consumer instanceId=ksql-1-1, clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-1-consumer, groupId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0] Group coordinator b11-pkc-lzxjz.us-west-2.aws.devel.cpdev.cloud:9092 (id: 2147483636 rack: null) is unavailable or invalid, will attempt rediscovery}} {{[INFO] 2020-03-05 00:56:36,390 [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator onSuccess - [Consumer instanceId=ksql-1-1, clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-1-consumer, groupId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0] Discovered group coordinator b2-pkc-lzxjz.us-west-2.aws.devel.cpdev.cloud:9092 (id: 2147483645 rack: null)}} {{[ERROR] 2020-03-05 00:56:51,857 [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle - [Consumer instanceId=ksql-1-1, clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-1-consumer, groupId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0] Received fatal exception: group.instance.id gets fenced}} {{[ERROR] 2020-03-05 00:56:51,858 [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator onFailure - [Consumer instanceId=ksql-1-1, clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-1-consumer, groupId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0] Caught fenced group.instance.id Optional[ksql-1-1] error in heartbeat thread}} {{[ERROR] 2020-03-05 00:56:51,859 [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-1] org.apache.kafka.streams.processor.internals.StreamThread run - stream-thread [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:}} {{[INFO] 2020-03-05 00:56:51,859 [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-1] org.apache.kafka.streams.processor.internals.StreamThread setState - stream-thread [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN}} {{[INFO] 2020-03-05 00:56:51,859 [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-1] org.apache.kafka.streams.processor.internals.StreamThread completeShutdown - stream-thread [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-1] Shutting down}} {{[INFO] 2020-03-05 00:56:51,889 [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-1] org.apache.kafka.clients.consumer.KafkaConsumer unsubscribe - [Consumer instanceId=ksql-1, clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-1-restore-consumer, groupId=null] Unsubscribed all topics or patterns and assigned partitions}} {{[INFO] 2020-03-05 00:56:51,889 [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-1] or
[jira] [Updated] (KAFKA-9659) Kafka Streams / Consumer fails on "fatal exception: group.instance.id gets fenced"
[ https://issues.apache.org/jira/browse/KAFKA-9659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rohan Desai updated KAFKA-9659: --- Description: I'm running a KSQL query, which underneath is built into a Kafka Streams application. The application has been running without issue for a few days, until today, when all the streams threads exited with: {{[ERROR] 2020-03-05 00:57:58,776 [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle - [Consumer instanceId=ksql-1-2, clientId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2-consumer, groupId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5] Received fatal exception: group.instance.id gets fenced}} {{[ERROR] 2020-03-05 00:57:58,776 [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] org.apache.kafka.clients.consumer.internals.AbstractCoordinator onFailure - [Consumer instanceId=ksql-1-2, clientId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2-consumer, groupId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5] Caught fenced group.instance.id Optional[ksql-1-2] error in heartbeat thread}} {{[ERROR] 2020-03-05 00:57:58,776 [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] org.apache.kafka.streams.processor.internals.StreamThread run - stream-thread [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:}} \{{ org.apache.kafka.common.errors.FencedInstanceIdException: The broker rejected this static consumer since another consumer with the same group.instance.id has registered with a different member.id.}}{{[INFO] 2020-03-05 00:57:58,776 [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] org.apache.kafka.streams.processor.internals.StreamThread setState - stream-thread [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] State transition from RUNNING to PENDING_SHUTDOWN}} I've attached the KSQL and Kafka Streams logs to this ticket. Here's a summary for one of the streams threads (instance id `ksql-1-2`): Around 00:56:36 the coordinator fails over from b11 to b2: {{[INFO] 2020-03-05 00:56:36,258 [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2] org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle - [Consumer instanceId=ksql-1-2, clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2-consumer, groupId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0] Attempt to heartbeat failed since coordinator b11-pkc-lzxjz.us-west-2.aws.devel.cpdev.cloud:9092 (id: 2147483636 rack: null) is either not started or not valid.}} {{ [INFO] 2020-03-05 00:56:36,258 [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2] org.apache.kafka.clients.consumer.internals.AbstractCoordinator markCoordinatorUnknown - [Consumer instanceId=ksql-1-2, clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2-consumer, groupId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0] Group coordinator b11-pkc-lzxjz.us-west-2.aws.devel.cpdev.cloud:9092 (id: 2147483636 rack: null) is unavailable or invalid, will attempt rediscovery}} {{ [INFO] 2020-03-05 00:56:36,270 [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2] org.apache.kafka.clients.consumer.internals.AbstractCoordinator onSuccess - [Consumer instanceId=ksql-1-2, clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2-consumer, groupId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0] Discovered group coordinator b2-pkc-lzxjz.us-west-2.aws.devel.cpdev.cloud:9092 (id: 2147483645 rack: null)}} A few seconds later, offset commits start failing with an error indicating the new coordinator is initializing: {{[WARN] 2020-03-05 00:56:39,048 [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator handle - [Consumer instanceId=ksql-1-2,
[jira] [Updated] (KAFKA-9659) Kafka Streams / Consumer fails on "fatal exception: group.instance.id gets fenced"
[ https://issues.apache.org/jira/browse/KAFKA-9659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rohan Desai updated KAFKA-9659: --- Attachment: ksql-1.logs > Kafka Streams / Consumer fails on "fatal exception: group.instance.id gets > fenced" > -- > > Key: KAFKA-9659 > URL: https://issues.apache.org/jira/browse/KAFKA-9659 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.5.0 >Reporter: Rohan Desai >Priority: Major > Attachments: ksql-1.logs > > > I'm running a KSQL query, which underneath is built into a Kafka Streams > application. The application has been running without issue for a few days, > until today, when all the streams threads exited with: > > > {{[ERROR] 2020-03-05 00:57:58,776 > [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] > org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle - > [Consumer instanceId=ksql-1-2, > clientId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2-consumer, > groupId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5] > Received fatal exception: group.instance.id gets fenced}} > {{[ERROR] 2020-03-05 00:57:58,776 > [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] > org.apache.kafka.clients.consumer.internals.AbstractCoordinator onFailure - > [Consumer instanceId=ksql-1-2, > clientId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2-consumer, > groupId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5] > Caught fenced group.instance.id Optional[ksql-1-2] error in heartbeat thread}} > {{[ERROR] 2020-03-05 00:57:58,776 > [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] > org.apache.kafka.streams.processor.internals.StreamThread run - > stream-thread > [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] > Encountered the following unexpected Kafka exception during processing, this > usually indicate Streams internal errors:}} > \{{ org.apache.kafka.common.errors.FencedInstanceIdException: The broker > rejected this static consumer since another consumer with the same > group.instance.id has registered with a different member.id.}}{{[INFO] > 2020-03-05 00:57:58,776 > [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] > org.apache.kafka.streams.processor.internals.StreamThread setState - > stream-thread > [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] > State transition from RUNNING to PENDING_SHUTDOWN}} > > I've attached the KSQL and Kafka Streams logs to this ticket. Here's a > summary for one of the streams threads (instance id `ksql-1-2`): > > Around 00:56:36 the coordinator fails over from b11 to b2: > > {{[INFO] 2020-03-05 00:56:36,258 > [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2] > org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle - > [Consumer instanceId=ksql-1-2, > clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2-consumer, > groupId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0] Attempt to > heartbeat failed since coordinator > b11-pkc-lzxjz.us-west-2.aws.devel.cpdev.cloud:9092 (id: 2147483636 rack: > null) is either not started or not valid.}} > {{ [INFO] 2020-03-05 00:56:36,258 > [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2] > org.apache.kafka.clients.consumer.internals.AbstractCoordinator > markCoordinatorUnknown - [Consumer instanceId=ksql-1-2, > clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2-consumer, > groupId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0] Group > coordinator b11-pkc-lzxjz.us-west-2.aws.devel.cpdev.cloud:9092 (id: > 2147483636 rack: null) is unavailable or invalid, will attempt rediscovery}} > {{ [INFO] 2020-03-05 00:56:36,270 > [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2] > org.apache.kafka.clients.consumer.internals.AbstractCoordinator onSuccess - > [Consumer instanceId=ksql-1-2, > clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-4
[jira] [Updated] (KAFKA-9659) Kafka Streams / Consumer fails on "fatal exception: group.instance.id gets fenced"
[ https://issues.apache.org/jira/browse/KAFKA-9659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rohan Desai updated KAFKA-9659: --- Description: I'm running a KSQL query, which underneath is built into a Kafka Streams application. The application has been running without issue for a few days, until today, when all the streams threads exited with: {{[ERROR] 2020-03-05 00:57:58,776 [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle - [Consumer instanceId=ksql-1-2, clientId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2-consumer, groupId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5] Received fatal exception: group.instance.id gets fenced}} {{[ERROR] 2020-03-05 00:57:58,776 [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] org.apache.kafka.clients.consumer.internals.AbstractCoordinator onFailure - [Consumer instanceId=ksql-1-2, clientId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2-consumer, groupId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5] Caught fenced group.instance.id Optional[ksql-1-2] error in heartbeat thread}} {{[ERROR] 2020-03-05 00:57:58,776 [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] org.apache.kafka.streams.processor.internals.StreamThread run - stream-thread [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:}} {{ org.apache.kafka.common.errors.FencedInstanceIdException: The broker rejected this static consumer since another consumer with the same group.instance.id has registered with a different member.id.}}{{[INFO] 2020-03-05 00:57:58,776 [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] org.apache.kafka.streams.processor.internals.StreamThread setState - stream-thread [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] State transition from RUNNING to PENDING_SHUTDOWN}} This event coincided with a broker (broker 2) having some downtime (as measured by a healthchecking service which periodically pings it with a produce/consume). I've attached the KSQL and Kafka Streams logs to this ticket. Here's a summary for one of the streams threads (instance id `ksql-1-2`): Around 00:56:36 the coordinator fails over from b11 to b2: {{[INFO] 2020-03-05 00:56:36,258 [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2] org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle - [Consumer instanceId=ksql-1-2, clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2-consumer, groupId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0] Attempt to heartbeat failed since coordinator b11-pkc-lzxjz.us-west-2.aws.devel.cpdev.cloud:9092 (id: 2147483636 rack: null) is either not started or not valid.}} {{ [INFO] 2020-03-05 00:56:36,258 [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2] org.apache.kafka.clients.consumer.internals.AbstractCoordinator markCoordinatorUnknown - [Consumer instanceId=ksql-1-2, clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2-consumer, groupId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0] Group coordinator b11-pkc-lzxjz.us-west-2.aws.devel.cpdev.cloud:9092 (id: 2147483636 rack: null) is unavailable or invalid, will attempt rediscovery}} {{ [INFO] 2020-03-05 00:56:36,270 [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2] org.apache.kafka.clients.consumer.internals.AbstractCoordinator onSuccess - [Consumer instanceId=ksql-1-2, clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2-consumer, groupId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0] Discovered group coordinator b2-pkc-lzxjz.us-west-2.aws.devel.cpdev.cloud:9092 (id: 2147483645 rack: null)}} A few seconds later, offset commits start failing with an error indicating the new coordinator is initializing: {{[WARN] 2020-03-05 00:56:39,048 [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_U
[jira] [Updated] (KAFKA-9659) Kafka Streams / Consumer fails on "fatal exception: group.instance.id gets fenced"
[ https://issues.apache.org/jira/browse/KAFKA-9659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rohan Desai updated KAFKA-9659: --- Description: I'm running a KSQL query, which underneath is built into a Kafka Streams application. The application has been running without issue for a few days, until today, when all the streams threads exited with: {{[ERROR] 2020-03-05 00:57:58,776 [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle - [Consumer instanceId=ksql-1-2, clientId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2-consumer, groupId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5] Received fatal exception: group.instance.id gets fenced}} {{[ERROR] 2020-03-05 00:57:58,776 [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] org.apache.kafka.clients.consumer.internals.AbstractCoordinator onFailure - [Consumer instanceId=ksql-1-2, clientId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2-consumer, groupId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5] Caught fenced group.instance.id Optional[ksql-1-2] error in heartbeat thread}} {{[ERROR] 2020-03-05 00:57:58,776 [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] org.apache.kafka.streams.processor.internals.StreamThread run - stream-thread [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:}} \{{ org.apache.kafka.common.errors.FencedInstanceIdException: The broker rejected this static consumer since another consumer with the same group.instance.id has registered with a different member.id.}}{{[INFO] 2020-03-05 00:57:58,776 [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] org.apache.kafka.streams.processor.internals.StreamThread setState - stream-thread [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] State transition from RUNNING to PENDING_SHUTDOWN}} I've attached the KSQL and Kafka Streams logs to this ticket. Here's a summary for one of the streams threads (instance id `ksql-1-2`): Around 00:56:36 the coordinator fails over from b11 to b2: {{[INFO] 2020-03-05 00:56:36,258 [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2] org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle - [Consumer instanceId=ksql-1-2, clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2-consumer, groupId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0] Attempt to heartbeat failed since coordinator b11-pkc-lzxjz.us-west-2.aws.devel.cpdev.cloud:9092 (id: 2147483636 rack: null) is either not started or not valid.}} {{ [INFO] 2020-03-05 00:56:36,258 [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2] org.apache.kafka.clients.consumer.internals.AbstractCoordinator markCoordinatorUnknown - [Consumer instanceId=ksql-1-2, clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2-consumer, groupId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0] Group coordinator b11-pkc-lzxjz.us-west-2.aws.devel.cpdev.cloud:9092 (id: 2147483636 rack: null) is unavailable or invalid, will attempt rediscovery}} {{ [INFO] 2020-03-05 00:56:36,270 [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2] org.apache.kafka.clients.consumer.internals.AbstractCoordinator onSuccess - [Consumer instanceId=ksql-1-2, clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2-consumer, groupId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0] Discovered group coordinator b2-pkc-lzxjz.us-west-2.aws.devel.cpdev.cloud:9092 (id: 2147483645 rack: null)}} A few seconds later, offset commits start failing with an error indicating the new coordinator is initializing: {{[WARN] 2020-03-05 00:56:39,048 [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator handle - [Consumer instanceId=ksql-1-2,
[jira] [Created] (KAFKA-9659) Kafka Streams / Consumer fails on "fatal exception: group.instance.id gets fenced"
Rohan Desai created KAFKA-9659: -- Summary: Kafka Streams / Consumer fails on "fatal exception: group.instance.id gets fenced" Key: KAFKA-9659 URL: https://issues.apache.org/jira/browse/KAFKA-9659 Project: Kafka Issue Type: Bug Affects Versions: 2.5.0 Reporter: Rohan Desai I'm running a KSQL query, which underneath is built into a Kafka Streams application. The application has been running without issue for a few days, until today, when all the streams threads exited with: ``` [ERROR] 2020-03-05 00:57:58,776 [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle - [Consumer instanceId=ksql-1-2, clientId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2-consumer, groupId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5] Received fatal exception: group.instance.id gets fenced [ERROR] 2020-03-05 00:57:58,776 [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] org.apache.kafka.clients.consumer.internals.AbstractCoordinator onFailure - [Consumer instanceId=ksql-1-2, clientId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2-consumer, groupId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5] Caught fenced group.instance.id Optional[ksql-1-2] error in heartbeat thread [ERROR] 2020-03-05 00:57:58,776 [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] org.apache.kafka.streams.processor.internals.StreamThread run - stream-thread [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors: org.apache.kafka.common.errors.FencedInstanceIdException: The broker rejected this static consumer since another consumer with the same group.instance.id has registered with a different member.id. [INFO] 2020-03-05 00:57:58,776 [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] org.apache.kafka.streams.processor.internals.StreamThread setState - stream-thread [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] State transition from RUNNING to PENDING_SHUTDOWN ``` This event coincided with a broker (broker 2) having some downtime (as measured by a healthchecking service which periodically pings it with a produce/consume). I've attached the KSQL and Kafka Streams logs to this ticket. Here's a summary for one of the streams threads (instance id `ksql-1-2`): Around 00:56:36 the coordinator fails over from b11 to b2: ``` [INFO] 2020-03-05 00:56:36,258 [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2] org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle - [Consumer instanceId=ksql-1-2, clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2-consumer, groupId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0] Attempt to heartbeat failed since coordinator b11-pkc-lzxjz.us-west-2.aws.devel.cpdev.cloud:9092 (id: 2147483636 rack: null) is either not started or not valid. [INFO] 2020-03-05 00:56:36,258 [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2] org.apache.kafka.clients.consumer.internals.AbstractCoordinator markCoordinatorUnknown - [Consumer instanceId=ksql-1-2, clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2-consumer, groupId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0] Group coordinator b11-pkc-lzxjz.us-west-2.aws.devel.cpdev.cloud:9092 (id: 2147483636 rack: null) is unavailable or invalid, will attempt rediscovery [INFO] 2020-03-05 00:56:36,270 [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2] org.apache.kafka.clients.consumer.internals.AbstractCoordinator onSuccess - [Consumer instanceId=ksql-1-2, clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2-consumer, groupId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0] Discovered group coordinator b2-pkc-lzxjz.us-west-2.aws.devel.cpdev.cloud:9092 (id: 2147483645 rack: null) ``` A few seconds later, offset commits start
[jira] [Commented] (KAFKA-7896) Add some Log4J Kafka Properties for Producing to Secured Brokers
[ https://issues.apache.org/jira/browse/KAFKA-7896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16760045#comment-16760045 ] Rohan Desai commented on KAFKA-7896: Thanks [~dongjin]! I did also file a KIP, which is here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-425%3A+Add+some+Log4J+Kafka+Appender+Properties+for+Producing+to+Secured+Brokers > Add some Log4J Kafka Properties for Producing to Secured Brokers > > > Key: KAFKA-7896 > URL: https://issues.apache.org/jira/browse/KAFKA-7896 > Project: Kafka > Issue Type: Bug >Reporter: Rohan Desai >Assignee: Rohan Desai >Priority: Major > > The existing Log4J Kafka appender supports producing to brokers that use the > GSSAPI (kerberos) sasl mechanism, and only support configuring jaas via a > jaas config file. Filing this issue to cover extending this to include the > PLAIN mechanism and to support configuring jaas via an in-line configuration. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7896) Add some Log4J Kafka Properties for Producing to Secured Brokers
Rohan Desai created KAFKA-7896: -- Summary: Add some Log4J Kafka Properties for Producing to Secured Brokers Key: KAFKA-7896 URL: https://issues.apache.org/jira/browse/KAFKA-7896 Project: Kafka Issue Type: Bug Reporter: Rohan Desai The existing Log4J Kafka appender supports producing to brokers that use the GSSAPI (kerberos) sasl mechanism, and only support configuring jaas via a jaas config file. Filing this issue to cover extending this to include the PLAIN mechanism and to support configuring jaas via an in-line configuration. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7311) Sender should reset next batch expiry time between poll loops
[ https://issues.apache.org/jira/browse/KAFKA-7311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rohan Desai updated KAFKA-7311: --- Description: Sender/RecordAccumulator never resets the next batch expiry time. Its always computed as the min of the current value and the expiry time for all batches being processed. This means that its always set to the expiry time of the first batch, and once that time has passed Sender starts spinning on epoll with a timeout of 0, which consumes a lot of CPU. This patch updates Sender to reset the next batch expiry time on each poll loop so that a new value reflecting the expiry time for the current set of batches is computed. We observed this running KSQL when investigating why throughput would drop after about 10 minutes (the default delivery timeout). (was: Sender/RecordAccumulator never resets the next batch expiry time. Its always computed as the min of the current value and the expiry time for all batches being processed. This means that its always set to the expiry time of the first batch, and once that time has passed Sender starts spinning on epoll with a timeout of 0, which consumes a lot of CPU. This patch updates Sender to reset the next batch expiry time on each poll loop so that a new value reflecting the expiry time for the current set of batches is computed. We hit this when testing KSQL) > Sender should reset next batch expiry time between poll loops > - > > Key: KAFKA-7311 > URL: https://issues.apache.org/jira/browse/KAFKA-7311 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.1.0 >Reporter: Rohan Desai >Priority: Major > Fix For: 2.1.0 > > > Sender/RecordAccumulator never resets the next batch expiry time. Its always > computed as the min of the current value and the expiry time for all batches > being processed. This means that its always set to the expiry time of the > first batch, and once that time has passed Sender starts spinning on epoll > with a timeout of 0, which consumes a lot of CPU. This patch updates Sender > to reset the next batch expiry time on each poll loop so that a new value > reflecting the expiry time for the current set of batches is computed. We > observed this running KSQL when investigating why throughput would drop after > about 10 minutes (the default delivery timeout). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7311) Sender should reset next batch expiry time between poll loops
[ https://issues.apache.org/jira/browse/KAFKA-7311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rohan Desai updated KAFKA-7311: --- Description: Sender/RecordAccumulator never resets the next batch expiry time. Its always computed as the min of the current value and the expiry time for all batches being processed. This means that its always set to the expiry time of the first batch, and once that time has passed Sender starts spinning on epoll with a timeout of 0, which consumes a lot of CPU. This patch updates Sender to reset the next batch expiry time on each poll loop so that a new value reflecting the expiry time for the current set of batches is computed. We hit this when testing KSQL (was: Sender/RecordAccumulator never resets the next batch expiry time. Its always computed as the min of the current value and the expiry time for all batches being processed. This means that its always set to the expiry time of the first batch, and once that time has passed Sender starts spinning on epoll with a timeout of 0, which consumes a lot of CPU. This patch updates Sender to reset the next batch expiry time on each poll loop so that a new value reflecting the expiry time for the current set of batches is computed.) > Sender should reset next batch expiry time between poll loops > - > > Key: KAFKA-7311 > URL: https://issues.apache.org/jira/browse/KAFKA-7311 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.1.0 >Reporter: Rohan Desai >Priority: Major > Fix For: 2.1.0 > > > Sender/RecordAccumulator never resets the next batch expiry time. Its always > computed as the min of the current value and the expiry time for all batches > being processed. This means that its always set to the expiry time of the > first batch, and once that time has passed Sender starts spinning on epoll > with a timeout of 0, which consumes a lot of CPU. This patch updates Sender > to reset the next batch expiry time on each poll loop so that a new value > reflecting the expiry time for the current set of batches is computed. We hit > this when testing KSQL -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7311) Sender should reset next batch expiry time between poll loops
[ https://issues.apache.org/jira/browse/KAFKA-7311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rohan Desai updated KAFKA-7311: --- Description: Sender/RecordAccumulator never resets the next batch expiry time. Its always computed as the min of the current value and the expiry time for all batches being processed. This means that its always set to the expiry time of the first batch, and once that time has passed Sender starts spinning on epoll with a timeout of 0, which consumes a lot of CPU. This patch updates Sender to reset the next batch expiry time on each poll loop so that a new value reflecting the expiry time for the current set of batches is computed. (was: Sender does not reset next batch expiry time between poll loops. This means that once it crosses the expiry time of the first batch, it starts spinning on epoll with a timeout of 0, which consumes a lot of CPU. We observed this running KSQL when investigating why throughput would drop after about 10 minutes (the default delivery timeout).) > Sender should reset next batch expiry time between poll loops > - > > Key: KAFKA-7311 > URL: https://issues.apache.org/jira/browse/KAFKA-7311 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.1.0 >Reporter: Rohan Desai >Priority: Major > Fix For: 2.1.0 > > > Sender/RecordAccumulator never resets the next batch expiry time. Its always > computed as the min of the current value and the expiry time for all batches > being processed. This means that its always set to the expiry time of the > first batch, and once that time has passed Sender starts spinning on epoll > with a timeout of 0, which consumes a lot of CPU. This patch updates Sender > to reset the next batch expiry time on each poll loop so that a new value > reflecting the expiry time for the current set of batches is computed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7311) Sender should reset next batch expiry time between poll loops
Rohan Desai created KAFKA-7311: -- Summary: Sender should reset next batch expiry time between poll loops Key: KAFKA-7311 URL: https://issues.apache.org/jira/browse/KAFKA-7311 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 2.1.0 Reporter: Rohan Desai Fix For: 2.1.0 Sender does not reset next batch expiry time between poll loops. This means that once it crosses the expiry time of the first batch, it starts spinning on epoll with a timeout of 0, which consumes a lot of CPU. We observed this running KSQL when investigating why throughput would drop after about 10 minutes (the default delivery timeout). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7064) "Unexpected resource type GROUP" when describing broker configs using latest admin client
[ https://issues.apache.org/jira/browse/KAFKA-7064?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rohan Desai updated KAFKA-7064: --- Description: I'm getting the following error when I try to describe broker configs using the admin client: {code:java} org.apache.kafka.common.errors.InvalidRequestException: Unexpected resource type GROUP for resource 0{code} I think its due to this commit: [https://github.com/apache/kafka/commit/49db5a63c043b50c10c2dfd0648f8d74ee917b6a] My guess at what's going on is that now that the client is using ConfigResource instead of Resource it's sending a describe request for resource type BROKER w/ id 3, while the broker associates id 3 w/ GROUP was: I'm getting the following error when I try to describe broker configs using the admin client: ```org.apache.kafka.common.errors.InvalidRequestException: Unexpected resource type GROUP for resource 0``` I think its due to this commit: https://github.com/apache/kafka/commit/49db5a63c043b50c10c2dfd0648f8d74ee917b6a My guess at what's going on is that now that the client is using ConfigResource instead of Resource it's sending a describe request for resource type BROKER w/ id 3, while the broker associates id 3 w/ GROUP > "Unexpected resource type GROUP" when describing broker configs using latest > admin client > - > > Key: KAFKA-7064 > URL: https://issues.apache.org/jira/browse/KAFKA-7064 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.0.0 >Reporter: Rohan Desai >Priority: Blocker > > I'm getting the following error when I try to describe broker configs using > the admin client: > {code:java} > org.apache.kafka.common.errors.InvalidRequestException: Unexpected resource > type GROUP for resource 0{code} > I think its due to this commit: > [https://github.com/apache/kafka/commit/49db5a63c043b50c10c2dfd0648f8d74ee917b6a] > > My guess at what's going on is that now that the client is using > ConfigResource instead of Resource it's sending a describe request for > resource type BROKER w/ id 3, while the broker associates id 3 w/ GROUP -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7064) "Unexpected resource type GROUP" when describing broker configs using latest admin client
Rohan Desai created KAFKA-7064: -- Summary: "Unexpected resource type GROUP" when describing broker configs using latest admin client Key: KAFKA-7064 URL: https://issues.apache.org/jira/browse/KAFKA-7064 Project: Kafka Issue Type: Bug Affects Versions: 2.0.0 Reporter: Rohan Desai I'm getting the following error when I try to describe broker configs using the admin client: ```org.apache.kafka.common.errors.InvalidRequestException: Unexpected resource type GROUP for resource 0``` I think its due to this commit: https://github.com/apache/kafka/commit/49db5a63c043b50c10c2dfd0648f8d74ee917b6a My guess at what's going on is that now that the client is using ConfigResource instead of Resource it's sending a describe request for resource type BROKER w/ id 3, while the broker associates id 3 w/ GROUP -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (KAFKA-6383) StreamThread.shutdown doesn't clean up completely when called before StreamThread.start
[ https://issues.apache.org/jira/browse/KAFKA-6383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rohan Desai reopened KAFKA-6383: I missed a race condition in my fix. If we start and then shutdown a streams thread without the streams thread running in between, then shutdown() throws an IllegalThreadStateException. This happens because shutdown() uses StreamThread.state to decide whether to call start(), and the state is transitioned from run which may not have executed yet. > StreamThread.shutdown doesn't clean up completely when called before > StreamThread.start > --- > > Key: KAFKA-6383 > URL: https://issues.apache.org/jira/browse/KAFKA-6383 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Rohan Desai >Assignee: Rohan Desai > Fix For: 1.1.0 > > > The following code leaks a producer network thread: > {code} > ks = new KafkaStreams(...); > ks.close(); > {code} > The underlying issue is that KafkaStreams creates a bunch of StreamsThreads > via StreamThread.create, which in turn creates a bunch of stuff (including a > producer). These resources are cleaned up only when the thread exits. So if > the thread was never started, then they are never cleaned up. > StreamThread.shutdown should clean up if it sees that the thread has never > been started. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6383) StreamThread.shutdown doesn't clean up completely when called beforeStreamThread.start
Rohan Desai created KAFKA-6383: -- Summary: StreamThread.shutdown doesn't clean up completely when called beforeStreamThread.start Key: KAFKA-6383 URL: https://issues.apache.org/jira/browse/KAFKA-6383 Project: Kafka Issue Type: Bug Reporter: Rohan Desai The following code leaks a producer network thread: {code} ks = new KafkaStreams(...); ks.close(); {code} The underlying issue is that KafkaStreams creates a bunch of StreamsThreads via StreamThread.create, which in turn creates a bunch of stuff (including a producer). These resources are cleaned up only when the thread exits. So if the thread was never started, then they are never cleaned up. StreamThread.shutdown should clean up if it sees that the thread has never been started. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6383) StreamThread.shutdown doesn't clean up completely when called before StreamThread.start
[ https://issues.apache.org/jira/browse/KAFKA-6383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rohan Desai updated KAFKA-6383: --- Summary: StreamThread.shutdown doesn't clean up completely when called before StreamThread.start (was: StreamThread.shutdown doesn't clean up completely when called beforeStreamThread.start) > StreamThread.shutdown doesn't clean up completely when called before > StreamThread.start > --- > > Key: KAFKA-6383 > URL: https://issues.apache.org/jira/browse/KAFKA-6383 > Project: Kafka > Issue Type: Bug >Reporter: Rohan Desai > > The following code leaks a producer network thread: > {code} > ks = new KafkaStreams(...); > ks.close(); > {code} > The underlying issue is that KafkaStreams creates a bunch of StreamsThreads > via StreamThread.create, which in turn creates a bunch of stuff (including a > producer). These resources are cleaned up only when the thread exits. So if > the thread was never started, then they are never cleaned up. > StreamThread.shutdown should clean up if it sees that the thread has never > been started. -- This message was sent by Atlassian JIRA (v6.4.14#64029)