[jira] [Commented] (KAFKA-14942) CopyOnWriteMap implements ConcurrentMap but does not implement required default methods
[ https://issues.apache.org/jira/browse/KAFKA-14942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17717388#comment-17717388 ] Steven Schlansker commented on KAFKA-14942: --- Ah, I am sorry, I made a mistake in this analysis. The ConcurrentMap interface replaces the default Map implementations with more advanced ones. So that should probably not be a problem. > CopyOnWriteMap implements ConcurrentMap but does not implement required > default methods > --- > > Key: KAFKA-14942 > URL: https://issues.apache.org/jira/browse/KAFKA-14942 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.4.0 >Reporter: Steven Schlansker >Priority: Minor > > Hi Kafka team, > I was reading through the kafka-clients CopyOnWriteMap while investigating a > problem in a different library, and I think it is declaring that it is a > ConcurrentMap but does not completely implement that interface. > In particular, it inherits e.g. computeIfAbsent as a default method from Map, > which is noted to be a non-atomic implementation, and is not synchronized in > any way. I think this can lead to a reader experiencing a map whose contents > are not consistent with any serial execution of write ops. > > Consider a thread T1 which calls computeIfAbsent("a", _ -> "1") > T1 computeIfAbsent calls get("a") and observes null, and is then pre-empted > T2 calls put("a", "2"), which copies the (empty) backing map and stores > \{"a": "2"} > T1 computeIfAbsent then wakes up, still thinking the value is null, and calls > put("a", "1"). > > This leads to the map finishing with the contents \{"a":"1"}, while any > serial execution of these two operations should always finish with \{"a":"2"}. > > I think CopyOnWriteMap should either re-implement all mutating default > methods at least as synchronized. If this is a special internal map and we > know those will never be called, perhaps they should throw > UnsupportedOperationException or at least document the class as not a > complete and proper implementation. > > Thank you for your consideration. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14942) CopyOnWriteMap implements ConcurrentMap but does not implement required default methods
[ https://issues.apache.org/jira/browse/KAFKA-14942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Schlansker resolved KAFKA-14942. --- Resolution: Invalid > CopyOnWriteMap implements ConcurrentMap but does not implement required > default methods > --- > > Key: KAFKA-14942 > URL: https://issues.apache.org/jira/browse/KAFKA-14942 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.4.0 >Reporter: Steven Schlansker >Priority: Minor > > Hi Kafka team, > I was reading through the kafka-clients CopyOnWriteMap while investigating a > problem in a different library, and I think it is declaring that it is a > ConcurrentMap but does not completely implement that interface. > In particular, it inherits e.g. computeIfAbsent as a default method from Map, > which is noted to be a non-atomic implementation, and is not synchronized in > any way. I think this can lead to a reader experiencing a map whose contents > are not consistent with any serial execution of write ops. > > Consider a thread T1 which calls computeIfAbsent("a", _ -> "1") > T1 computeIfAbsent calls get("a") and observes null, and is then pre-empted > T2 calls put("a", "2"), which copies the (empty) backing map and stores > \{"a": "2"} > T1 computeIfAbsent then wakes up, still thinking the value is null, and calls > put("a", "1"). > > This leads to the map finishing with the contents \{"a":"1"}, while any > serial execution of these two operations should always finish with \{"a":"2"}. > > I think CopyOnWriteMap should either re-implement all mutating default > methods at least as synchronized. If this is a special internal map and we > know those will never be called, perhaps they should throw > UnsupportedOperationException or at least document the class as not a > complete and proper implementation. > > Thank you for your consideration. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14942) CopyOnWriteMap implements ConcurrentMap but does not implement required default methods
Steven Schlansker created KAFKA-14942: - Summary: CopyOnWriteMap implements ConcurrentMap but does not implement required default methods Key: KAFKA-14942 URL: https://issues.apache.org/jira/browse/KAFKA-14942 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 3.4.0 Reporter: Steven Schlansker Hi Kafka team, I was reading through the kafka-clients CopyOnWriteMap while investigating a problem in a different library, and I think it is declaring that it is a ConcurrentMap but does not completely implement that interface. In particular, it inherits e.g. computeIfAbsent as a default method from Map, which is noted to be a non-atomic implementation, and is not synchronized in any way. I think this can lead to a reader experiencing a map whose contents are not consistent with any serial execution of write ops. Consider a thread T1 which calls computeIfAbsent("a", _ -> "1") T1 computeIfAbsent calls get("a") and observes null, and is then pre-empted T2 calls put("a", "2"), which copies the (empty) backing map and stores \{"a": "2"} T1 computeIfAbsent then wakes up, still thinking the value is null, and calls put("a", "1"). This leads to the map finishing with the contents \{"a":"1"}, while any serial execution of these two operations should always finish with \{"a":"2"}. I think CopyOnWriteMap should either re-implement all mutating default methods at least as synchronized. If this is a special internal map and we know those will never be called, perhaps they should throw UnsupportedOperationException or at least document the class as not a complete and proper implementation. Thank you for your consideration. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-13593) ThrottledChannelReaper slows broker shutdown by multiple seconds
[ https://issues.apache.org/jira/browse/KAFKA-13593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Schlansker resolved KAFKA-13593. --- Resolution: Fixed > ThrottledChannelReaper slows broker shutdown by multiple seconds > > > Key: KAFKA-13593 > URL: https://issues.apache.org/jira/browse/KAFKA-13593 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 3.0.0 >Reporter: Steven Schlansker >Priority: Minor > > We run an embedded KRaft broker in integration tests, to test that our > Producer / Consumers are all hooked up and verify our overall pipeline. > While trying to deliver CI speed improvements, we noticed that the majority > of time for a small test is actually spent in Kafka broker shutdown. From the > logs, it looks like the ClientQuotaManager has multiple > ThrottledChannelReaper threads and each of them takes up to a second to > shutdown. > {code:java} > 2022-01-12T15:26:31.932Z [main] INFO kafka.log.LogManager - Shutdown > complete. > 2022-01-12T15:26:31.934Z [main] INFO > k.s.ClientQuotaManager$ThrottledChannelReaper - > [ThrottledChannelReaper-Fetch]: Shutting down > 2022-01-12T15:26:32.311Z [main] INFO > k.s.ClientQuotaManager$ThrottledChannelReaper - > [ThrottledChannelReaper-Fetch]: Shutdown completed > 2022-01-12T15:26:32.311Z [ThrottledChannelReaper-Fetch] INFO > k.s.ClientQuotaManager$ThrottledChannelReaper - > [ThrottledChannelReaper-Fetch]: Stopped > 2022-01-12T15:26:32.311Z [main] INFO > k.s.ClientQuotaManager$ThrottledChannelReaper - > [ThrottledChannelReaper-Produce]: Shutting down > 2022-01-12T15:26:33.312Z [ThrottledChannelReaper-Produce] INFO > k.s.ClientQuotaManager$ThrottledChannelReaper - > [ThrottledChannelReaper-Produce]: Stopped > 2022-01-12T15:26:33.312Z [main] INFO > k.s.ClientQuotaManager$ThrottledChannelReaper - > [ThrottledChannelReaper-Produce]: Shutdown completed > 2022-01-12T15:26:33.312Z [main] INFO > k.s.ClientQuotaManager$ThrottledChannelReaper - > [ThrottledChannelReaper-Request]: Shutting down > 2022-01-12T15:26:34.315Z [ThrottledChannelReaper-Request] INFO > k.s.ClientQuotaManager$ThrottledChannelReaper - > [ThrottledChannelReaper-Request]: Stopped > 2022-01-12T15:26:34.315Z [main] INFO > k.s.ClientQuotaManager$ThrottledChannelReaper - > [ThrottledChannelReaper-Request]: Shutdown completed > 2022-01-12T15:26:34.315Z [main] INFO > k.s.ClientQuotaManager$ThrottledChannelReaper - > [ThrottledChannelReaper-ControllerMutation]: Shutting down > 2022-01-12T15:26:35.317Z [main] INFO > k.s.ClientQuotaManager$ThrottledChannelReaper - > [ThrottledChannelReaper-ControllerMutation]: Shutdown completed > 2022-01-12T15:26:35.317Z [ThrottledChannelReaper-ControllerMutation] INFO > k.s.ClientQuotaManager$ThrottledChannelReaper - > [ThrottledChannelReaper-ControllerMutation]: Stopped{code} > Inspecting the code, the ThrottledChannelReaper threads are marked as not > interruptible, so ShutdownableThread does not interrupt the worker on > shutdown. Unfortunately, the doWork method polls with a 1 second timeout. So > you wait up to 1s for every type of quota, in this case for a total of almost > 4s. > > While this is not a problem for production Kafka brokers, where instances are > expected to stay up for months, in tests that expect to spin up and down it > is easily noticed and adds real overhead to CI. > > Suggested possible remediations: > * Allow ThrottledChannelReaper to be interrupted by ShutdownableThread > * ThrottledChannelReaper overrides initiateShutdown and after calling > {{super.initiateShutdown}} then enqueues a {{null}} element on the delayQueue > to force worker thread to notice shutdown state > * Reduce 1s poll timeout to something small (carries overhead penalty for > all users though, so this is less desirable), or make it configurable so we > can set it to e.g. 10ms in unit tests -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13593) ThrottledChannelReaper slows broker shutdown by multiple seconds
[ https://issues.apache.org/jira/browse/KAFKA-13593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17475573#comment-17475573 ] Steven Schlansker commented on KAFKA-13593: --- Perfect! Glad I am not the only one that noticed this :) Thank you for the fix, we'll look forward to seeing it in an update soon. (3.0.1?) > ThrottledChannelReaper slows broker shutdown by multiple seconds > > > Key: KAFKA-13593 > URL: https://issues.apache.org/jira/browse/KAFKA-13593 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 3.0.0 >Reporter: Steven Schlansker >Priority: Minor > > We run an embedded KRaft broker in integration tests, to test that our > Producer / Consumers are all hooked up and verify our overall pipeline. > While trying to deliver CI speed improvements, we noticed that the majority > of time for a small test is actually spent in Kafka broker shutdown. From the > logs, it looks like the ClientQuotaManager has multiple > ThrottledChannelReaper threads and each of them takes up to a second to > shutdown. > {code:java} > 2022-01-12T15:26:31.932Z [main] INFO kafka.log.LogManager - Shutdown > complete. > 2022-01-12T15:26:31.934Z [main] INFO > k.s.ClientQuotaManager$ThrottledChannelReaper - > [ThrottledChannelReaper-Fetch]: Shutting down > 2022-01-12T15:26:32.311Z [main] INFO > k.s.ClientQuotaManager$ThrottledChannelReaper - > [ThrottledChannelReaper-Fetch]: Shutdown completed > 2022-01-12T15:26:32.311Z [ThrottledChannelReaper-Fetch] INFO > k.s.ClientQuotaManager$ThrottledChannelReaper - > [ThrottledChannelReaper-Fetch]: Stopped > 2022-01-12T15:26:32.311Z [main] INFO > k.s.ClientQuotaManager$ThrottledChannelReaper - > [ThrottledChannelReaper-Produce]: Shutting down > 2022-01-12T15:26:33.312Z [ThrottledChannelReaper-Produce] INFO > k.s.ClientQuotaManager$ThrottledChannelReaper - > [ThrottledChannelReaper-Produce]: Stopped > 2022-01-12T15:26:33.312Z [main] INFO > k.s.ClientQuotaManager$ThrottledChannelReaper - > [ThrottledChannelReaper-Produce]: Shutdown completed > 2022-01-12T15:26:33.312Z [main] INFO > k.s.ClientQuotaManager$ThrottledChannelReaper - > [ThrottledChannelReaper-Request]: Shutting down > 2022-01-12T15:26:34.315Z [ThrottledChannelReaper-Request] INFO > k.s.ClientQuotaManager$ThrottledChannelReaper - > [ThrottledChannelReaper-Request]: Stopped > 2022-01-12T15:26:34.315Z [main] INFO > k.s.ClientQuotaManager$ThrottledChannelReaper - > [ThrottledChannelReaper-Request]: Shutdown completed > 2022-01-12T15:26:34.315Z [main] INFO > k.s.ClientQuotaManager$ThrottledChannelReaper - > [ThrottledChannelReaper-ControllerMutation]: Shutting down > 2022-01-12T15:26:35.317Z [main] INFO > k.s.ClientQuotaManager$ThrottledChannelReaper - > [ThrottledChannelReaper-ControllerMutation]: Shutdown completed > 2022-01-12T15:26:35.317Z [ThrottledChannelReaper-ControllerMutation] INFO > k.s.ClientQuotaManager$ThrottledChannelReaper - > [ThrottledChannelReaper-ControllerMutation]: Stopped{code} > Inspecting the code, the ThrottledChannelReaper threads are marked as not > interruptible, so ShutdownableThread does not interrupt the worker on > shutdown. Unfortunately, the doWork method polls with a 1 second timeout. So > you wait up to 1s for every type of quota, in this case for a total of almost > 4s. > > While this is not a problem for production Kafka brokers, where instances are > expected to stay up for months, in tests that expect to spin up and down it > is easily noticed and adds real overhead to CI. > > Suggested possible remediations: > * Allow ThrottledChannelReaper to be interrupted by ShutdownableThread > * ThrottledChannelReaper overrides initiateShutdown and after calling > {{super.initiateShutdown}} then enqueues a {{null}} element on the delayQueue > to force worker thread to notice shutdown state > * Reduce 1s poll timeout to something small (carries overhead penalty for > all users though, so this is less desirable), or make it configurable so we > can set it to e.g. 10ms in unit tests -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13593) ThrottledChannelReaper slows broker shutdown by multiple seconds
Steven Schlansker created KAFKA-13593: - Summary: ThrottledChannelReaper slows broker shutdown by multiple seconds Key: KAFKA-13593 URL: https://issues.apache.org/jira/browse/KAFKA-13593 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 3.0.0 Reporter: Steven Schlansker We run an embedded KRaft broker in integration tests, to test that our Producer / Consumers are all hooked up and verify our overall pipeline. While trying to deliver CI speed improvements, we noticed that the majority of time for a small test is actually spent in Kafka broker shutdown. From the logs, it looks like the ClientQuotaManager has multiple ThrottledChannelReaper threads and each of them takes up to a second to shutdown. {code:java} 2022-01-12T15:26:31.932Z [main] INFO kafka.log.LogManager - Shutdown complete. 2022-01-12T15:26:31.934Z [main] INFO k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-Fetch]: Shutting down 2022-01-12T15:26:32.311Z [main] INFO k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-Fetch]: Shutdown completed 2022-01-12T15:26:32.311Z [ThrottledChannelReaper-Fetch] INFO k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-Fetch]: Stopped 2022-01-12T15:26:32.311Z [main] INFO k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-Produce]: Shutting down 2022-01-12T15:26:33.312Z [ThrottledChannelReaper-Produce] INFO k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-Produce]: Stopped 2022-01-12T15:26:33.312Z [main] INFO k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-Produce]: Shutdown completed 2022-01-12T15:26:33.312Z [main] INFO k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-Request]: Shutting down 2022-01-12T15:26:34.315Z [ThrottledChannelReaper-Request] INFO k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-Request]: Stopped 2022-01-12T15:26:34.315Z [main] INFO k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-Request]: Shutdown completed 2022-01-12T15:26:34.315Z [main] INFO k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-ControllerMutation]: Shutting down 2022-01-12T15:26:35.317Z [main] INFO k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-ControllerMutation]: Shutdown completed 2022-01-12T15:26:35.317Z [ThrottledChannelReaper-ControllerMutation] INFO k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-ControllerMutation]: Stopped{code} Inspecting the code, the ThrottledChannelReaper threads are marked as not interruptible, so ShutdownableThread does not interrupt the worker on shutdown. Unfortunately, the doWork method polls with a 1 second timeout. So you wait up to 1s for every type of quota, in this case for a total of almost 4s. While this is not a problem for production Kafka brokers, where instances are expected to stay up for months, in tests that expect to spin up and down it is easily noticed and adds real overhead to CI. Suggested possible remediations: * Allow ThrottledChannelReaper to be interrupted by ShutdownableThread * ThrottledChannelReaper overrides initiateShutdown and after calling {{super.initiateShutdown}} then enqueues a {{null}} element on the delayQueue to force worker thread to notice shutdown state * Reduce 1s poll timeout to something small (carries overhead penalty for all users though, so this is less desirable), or make it configurable so we can set it to e.g. 10ms in unit tests -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-6767) OffsetCheckpoint write assumes parent directory exists
[ https://issues.apache.org/jira/browse/KAFKA-6767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16436442#comment-16436442 ] Steven Schlansker commented on KAFKA-6767: -- If you expect it to be created as a precondition, then yes I agree, this seems to indicate that either there was an insufficient fix, race, or some other bug :( > OffsetCheckpoint write assumes parent directory exists > -- > > Key: KAFKA-6767 > URL: https://issues.apache.org/jira/browse/KAFKA-6767 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Steven Schlansker >Priority: Minor > > We run Kafka Streams with RocksDB state stores on ephemeral disks (i.e. if an > instance dies it is created from scratch, rather than reusing the existing > RocksDB.) > We routinely see: > {code:java} > 2018-04-09T19:14:35.004Z WARN <> > [chat-0319e3c3-d8b2-4c60-bd69-a8484d8d4435-StreamThread-1] > o.a.k.s.p.i.ProcessorStateManager - task [0_11] Failed to write offset > checkpoint file to /mnt/mesos/sandbox/storage/chat/0_11/.checkpoint: {} > java.io.FileNotFoundException: > /mnt/mesos/sandbox/storage/chat/0_11/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open0(Native Method) > at java.io.FileOutputStream.open(FileOutputStream.java:270) > at java.io.FileOutputStream.(FileOutputStream.java:213) > at java.io.FileOutputStream.(FileOutputStream.java:162) > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:78) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:320) > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:314) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:307) > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:297) > at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:357) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:347) > at > org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:403) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:994) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:811) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720){code} > Inspecting the state store directory, I can indeed see that {{chat/0_11}} > does not exist (although many other partitions do). > > Looking at the OffsetCheckpoint write method, it seems to try to open a new > checkpoint file without first ensuring that the parent directory exists. > > {code:java} > public void write(final Map offsets) throws > IOException { > // if there is no offsets, skip writing the file to save disk IOs > if (offsets.isEmpty()) { > return; > } > synchronized (lock) { > // write to temp file and then swap with the existing file > final File temp = new File(file.getAbsolutePath() + ".tmp");{code} > > Either the OffsetCheckpoint class should initialize the directories if > needed, or some precondition of it being called should ensure that is the > case. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6767) OffsetCheckpoint write assumes parent directory exists
Steven Schlansker created KAFKA-6767: Summary: OffsetCheckpoint write assumes parent directory exists Key: KAFKA-6767 URL: https://issues.apache.org/jira/browse/KAFKA-6767 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 1.1.0 Reporter: Steven Schlansker We run Kafka Streams with RocksDB state stores on ephemeral disks (i.e. if an instance dies it is created from scratch, rather than reusing the existing RocksDB.) We routinely see: {code:java} 2018-04-09T19:14:35.004Z WARN <> [chat-0319e3c3-d8b2-4c60-bd69-a8484d8d4435-StreamThread-1] o.a.k.s.p.i.ProcessorStateManager - task [0_11] Failed to write offset checkpoint file to /mnt/mesos/sandbox/storage/chat/0_11/.checkpoint: {} java.io.FileNotFoundException: /mnt/mesos/sandbox/storage/chat/0_11/.checkpoint.tmp (No such file or directory) at java.io.FileOutputStream.open0(Native Method) at java.io.FileOutputStream.open(FileOutputStream.java:270) at java.io.FileOutputStream.(FileOutputStream.java:213) at java.io.FileOutputStream.(FileOutputStream.java:162) at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:78) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:320) at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:314) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:307) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:297) at org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67) at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:357) at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:347) at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:403) at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:994) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:811) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720){code} Inspecting the state store directory, I can indeed see that {{chat/0_11}} does not exist (although many other partitions do). Looking at the OffsetCheckpoint write method, it seems to try to open a new checkpoint file without first ensuring that the parent directory exists. {code:java} public void write(final Map offsets) throws IOException { // if there is no offsets, skip writing the file to save disk IOs if (offsets.isEmpty()) { return; } synchronized (lock) { // write to temp file and then swap with the existing file final File temp = new File(file.getAbsolutePath() + ".tmp");{code} Either the OffsetCheckpoint class should initialize the directories if needed, or some precondition of it being called should ensure that is the case. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4787) KafkaStreams close() is not reentrant
[ https://issues.apache.org/jira/browse/KAFKA-4787?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16145979#comment-16145979 ] Steven Schlansker commented on KAFKA-4787: -- Thanks for following up. I inspected the code as of 0.11.0.0 and it looks like it's much better now (joining happens on a background thread). Will report back if I observe further deadlocks. > KafkaStreams close() is not reentrant > - > > Key: KAFKA-4787 > URL: https://issues.apache.org/jira/browse/KAFKA-4787 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Steven Schlansker > > While building a simple application, I tried to implement a failure policy > where any uncaught exception terminates the application until an > administrator can evaluate and intervene: > {code} > /** Handle any uncaught exception by shutting down the program. */ > private void handleStreamException(Thread thread, Throwable t) { > LOG.error("stream exception in thread {}", thread, t); > streams.close(); > } > streams.setUncaughtExceptionHandler(this::handleStreamException); > streams.start(); > {code} > Unfortunately, because the KafkaStreams#close() method takes a lock, this is > prone to what looks like a deadlock: > {code} > "StreamThread-1" #80 prio=5 os_prio=0 tid=0x7f56096f4000 nid=0x40c8 > waiting for monitor entry [0x7f54f03ee000] >java.lang.Thread.State: BLOCKED (on object monitor) > at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java) > - waiting to lock <0xf171cda8> (a > org.apache.kafka.streams.KafkaStreams) > at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:438) > at > com.opentable.chat.service.ChatStorage$$Lambda$161/1940967023.close(Unknown > Source) > at > com.opentable.chat.service.ChatStorage.closeLog(ChatStorage.java:212) > at com.opentable.chat.service.ChatStorage.close(ChatStorage.java:207) > at > com.opentable.chat.service.ChatStorage.handleStreamException(ChatStorage.java:541) > at > com.opentable.chat.service.ChatStorage$$Lambda$123/149062221.uncaughtException(Unknown > Source) > at java.lang.Thread.dispatchUncaughtException(Thread.java:1956) > "main" #1 prio=5 os_prio=0 tid=0x7f5608011000 nid=0x3f76 in Object.wait() > [0x7f5610f04000] >java.lang.Thread.State: WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > at java.lang.Thread.join(Thread.java:1249) > - locked <0xfd302bf0> (a java.lang.Thread) > at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:494) > - locked <0xf171cda8> (a > org.apache.kafka.streams.KafkaStreams) > at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:438) > at > com.opentable.chat.service.ChatStorage$$Lambda$161/1940967023.close(Unknown > Source) > at > com.opentable.chat.service.ChatStorage.closeLog(ChatStorage.java:212) > at com.opentable.chat.service.ChatStorage.close(ChatStorage.java:207) > {code} > Note how the main thread calls close(), which encounters an exception. It > uses a StreamThread to dispatch to the handler, which calls close(). Once it > tries to take the monitor, we are left in a position where main is joined on > StreamThread-1, but StreamThread-1 is waiting for main to release that > monitor. > Arguably it's a bit abusive to call close() in this way (it certainly wasn't > intentional) -- but to make Kafka Streams robust it should handle any > sequence of close() invocations in particular gracefully. -- This message was sent by Atlassian JIRA (v6.4.14#64029)