[jira] [Commented] (KAFKA-14942) CopyOnWriteMap implements ConcurrentMap but does not implement required default methods

2023-04-27 Thread Steven Schlansker (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17717388#comment-17717388
 ] 

Steven Schlansker commented on KAFKA-14942:
---

Ah, I am sorry, I made a mistake in this analysis. The ConcurrentMap interface 
replaces the default Map implementations with more advanced ones. So that 
should probably not be a problem.

> CopyOnWriteMap implements ConcurrentMap but does not implement required 
> default methods
> ---
>
> Key: KAFKA-14942
> URL: https://issues.apache.org/jira/browse/KAFKA-14942
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.4.0
>Reporter: Steven Schlansker
>Priority: Minor
>
> Hi Kafka team,
> I was reading through the kafka-clients CopyOnWriteMap while investigating a 
> problem in a different library, and I think it is declaring that it is a 
> ConcurrentMap but does not completely implement that interface.
> In particular, it inherits e.g. computeIfAbsent as a default method from Map, 
> which is noted to be a non-atomic implementation, and is not synchronized in 
> any way. I think this can lead to a reader experiencing a map whose contents 
> are not consistent with any serial execution of write ops.
>  
> Consider a thread T1 which calls computeIfAbsent("a", _ -> "1")
> T1 computeIfAbsent calls get("a") and observes null, and is then pre-empted
> T2 calls put("a", "2"), which copies the (empty) backing map and stores 
> \{"a": "2"}
> T1 computeIfAbsent then wakes up, still thinking the value is null, and calls 
> put("a", "1").
>  
> This leads to the map finishing with the contents \{"a":"1"}, while any 
> serial execution of these two operations should always finish with \{"a":"2"}.
>  
> I think CopyOnWriteMap should either re-implement all mutating default 
> methods at least as synchronized. If this is a special internal map and we 
> know those will never be called, perhaps they should throw 
> UnsupportedOperationException or at least document the class as not a 
> complete and proper implementation.
>  
> Thank you for your consideration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14942) CopyOnWriteMap implements ConcurrentMap but does not implement required default methods

2023-04-27 Thread Steven Schlansker (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Schlansker resolved KAFKA-14942.
---
Resolution: Invalid

> CopyOnWriteMap implements ConcurrentMap but does not implement required 
> default methods
> ---
>
> Key: KAFKA-14942
> URL: https://issues.apache.org/jira/browse/KAFKA-14942
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.4.0
>Reporter: Steven Schlansker
>Priority: Minor
>
> Hi Kafka team,
> I was reading through the kafka-clients CopyOnWriteMap while investigating a 
> problem in a different library, and I think it is declaring that it is a 
> ConcurrentMap but does not completely implement that interface.
> In particular, it inherits e.g. computeIfAbsent as a default method from Map, 
> which is noted to be a non-atomic implementation, and is not synchronized in 
> any way. I think this can lead to a reader experiencing a map whose contents 
> are not consistent with any serial execution of write ops.
>  
> Consider a thread T1 which calls computeIfAbsent("a", _ -> "1")
> T1 computeIfAbsent calls get("a") and observes null, and is then pre-empted
> T2 calls put("a", "2"), which copies the (empty) backing map and stores 
> \{"a": "2"}
> T1 computeIfAbsent then wakes up, still thinking the value is null, and calls 
> put("a", "1").
>  
> This leads to the map finishing with the contents \{"a":"1"}, while any 
> serial execution of these two operations should always finish with \{"a":"2"}.
>  
> I think CopyOnWriteMap should either re-implement all mutating default 
> methods at least as synchronized. If this is a special internal map and we 
> know those will never be called, perhaps they should throw 
> UnsupportedOperationException or at least document the class as not a 
> complete and proper implementation.
>  
> Thank you for your consideration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14942) CopyOnWriteMap implements ConcurrentMap but does not implement required default methods

2023-04-26 Thread Steven Schlansker (Jira)
Steven Schlansker created KAFKA-14942:
-

 Summary: CopyOnWriteMap implements ConcurrentMap but does not 
implement required default methods
 Key: KAFKA-14942
 URL: https://issues.apache.org/jira/browse/KAFKA-14942
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 3.4.0
Reporter: Steven Schlansker


Hi Kafka team,

I was reading through the kafka-clients CopyOnWriteMap while investigating a 
problem in a different library, and I think it is declaring that it is a 
ConcurrentMap but does not completely implement that interface.

In particular, it inherits e.g. computeIfAbsent as a default method from Map, 
which is noted to be a non-atomic implementation, and is not synchronized in 
any way. I think this can lead to a reader experiencing a map whose contents 
are not consistent with any serial execution of write ops.

 

Consider a thread T1 which calls computeIfAbsent("a", _ -> "1")

T1 computeIfAbsent calls get("a") and observes null, and is then pre-empted

T2 calls put("a", "2"), which copies the (empty) backing map and stores \{"a": 
"2"}

T1 computeIfAbsent then wakes up, still thinking the value is null, and calls 
put("a", "1").

 

This leads to the map finishing with the contents \{"a":"1"}, while any serial 
execution of these two operations should always finish with \{"a":"2"}.

 

I think CopyOnWriteMap should either re-implement all mutating default methods 
at least as synchronized. If this is a special internal map and we know those 
will never be called, perhaps they should throw UnsupportedOperationException 
or at least document the class as not a complete and proper implementation.

 

Thank you for your consideration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13593) ThrottledChannelReaper slows broker shutdown by multiple seconds

2022-01-13 Thread Steven Schlansker (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Schlansker resolved KAFKA-13593.
---
Resolution: Fixed

> ThrottledChannelReaper slows broker shutdown by multiple seconds
> 
>
> Key: KAFKA-13593
> URL: https://issues.apache.org/jira/browse/KAFKA-13593
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 3.0.0
>Reporter: Steven Schlansker
>Priority: Minor
>
> We run an embedded KRaft broker in integration tests, to test that our 
> Producer / Consumers are all hooked up and verify our overall pipeline.
> While trying to deliver CI speed improvements, we noticed that the majority 
> of time for a small test is actually spent in Kafka broker shutdown. From the 
> logs, it looks like the ClientQuotaManager has multiple 
> ThrottledChannelReaper threads and each of them takes up to a second to 
> shutdown.
> {code:java}
> 2022-01-12T15:26:31.932Z [main] INFO  kafka.log.LogManager - Shutdown 
> complete.
> 2022-01-12T15:26:31.934Z [main] INFO  
> k.s.ClientQuotaManager$ThrottledChannelReaper - 
> [ThrottledChannelReaper-Fetch]: Shutting down
> 2022-01-12T15:26:32.311Z [main] INFO  
> k.s.ClientQuotaManager$ThrottledChannelReaper - 
> [ThrottledChannelReaper-Fetch]: Shutdown completed
> 2022-01-12T15:26:32.311Z [ThrottledChannelReaper-Fetch] INFO  
> k.s.ClientQuotaManager$ThrottledChannelReaper - 
> [ThrottledChannelReaper-Fetch]: Stopped
> 2022-01-12T15:26:32.311Z [main] INFO  
> k.s.ClientQuotaManager$ThrottledChannelReaper - 
> [ThrottledChannelReaper-Produce]: Shutting down
> 2022-01-12T15:26:33.312Z [ThrottledChannelReaper-Produce] INFO  
> k.s.ClientQuotaManager$ThrottledChannelReaper - 
> [ThrottledChannelReaper-Produce]: Stopped
> 2022-01-12T15:26:33.312Z [main] INFO  
> k.s.ClientQuotaManager$ThrottledChannelReaper - 
> [ThrottledChannelReaper-Produce]: Shutdown completed
> 2022-01-12T15:26:33.312Z [main] INFO  
> k.s.ClientQuotaManager$ThrottledChannelReaper - 
> [ThrottledChannelReaper-Request]: Shutting down
> 2022-01-12T15:26:34.315Z [ThrottledChannelReaper-Request] INFO  
> k.s.ClientQuotaManager$ThrottledChannelReaper - 
> [ThrottledChannelReaper-Request]: Stopped
> 2022-01-12T15:26:34.315Z [main] INFO  
> k.s.ClientQuotaManager$ThrottledChannelReaper - 
> [ThrottledChannelReaper-Request]: Shutdown completed
> 2022-01-12T15:26:34.315Z [main] INFO  
> k.s.ClientQuotaManager$ThrottledChannelReaper - 
> [ThrottledChannelReaper-ControllerMutation]: Shutting down
> 2022-01-12T15:26:35.317Z [main] INFO  
> k.s.ClientQuotaManager$ThrottledChannelReaper - 
> [ThrottledChannelReaper-ControllerMutation]: Shutdown completed
> 2022-01-12T15:26:35.317Z [ThrottledChannelReaper-ControllerMutation] INFO  
> k.s.ClientQuotaManager$ThrottledChannelReaper - 
> [ThrottledChannelReaper-ControllerMutation]: Stopped{code}
> Inspecting the code, the ThrottledChannelReaper threads are marked as not 
> interruptible, so ShutdownableThread does not interrupt the worker on 
> shutdown. Unfortunately, the doWork method polls with a 1 second timeout. So 
> you wait up to 1s for every type of quota, in this case for a total of almost 
> 4s.
>  
> While this is not a problem for production Kafka brokers, where instances are 
> expected to stay up for months, in tests that expect to spin up and down it 
> is easily noticed and adds real overhead to CI.
>  
> Suggested possible remediations:
>  * Allow ThrottledChannelReaper to be interrupted by ShutdownableThread
>  * ThrottledChannelReaper overrides initiateShutdown and after calling 
> {{super.initiateShutdown}} then enqueues a {{null}} element on the delayQueue 
> to force worker thread to notice shutdown state
>  * Reduce 1s poll timeout to something small (carries overhead penalty for 
> all users though, so this is less desirable), or make it configurable so we 
> can set it to e.g. 10ms in unit tests



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13593) ThrottledChannelReaper slows broker shutdown by multiple seconds

2022-01-13 Thread Steven Schlansker (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17475573#comment-17475573
 ] 

Steven Schlansker commented on KAFKA-13593:
---

Perfect! Glad I am not the only one that noticed this :) Thank you for the fix, 
we'll look forward to seeing it in an update soon. (3.0.1?)

> ThrottledChannelReaper slows broker shutdown by multiple seconds
> 
>
> Key: KAFKA-13593
> URL: https://issues.apache.org/jira/browse/KAFKA-13593
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 3.0.0
>Reporter: Steven Schlansker
>Priority: Minor
>
> We run an embedded KRaft broker in integration tests, to test that our 
> Producer / Consumers are all hooked up and verify our overall pipeline.
> While trying to deliver CI speed improvements, we noticed that the majority 
> of time for a small test is actually spent in Kafka broker shutdown. From the 
> logs, it looks like the ClientQuotaManager has multiple 
> ThrottledChannelReaper threads and each of them takes up to a second to 
> shutdown.
> {code:java}
> 2022-01-12T15:26:31.932Z [main] INFO  kafka.log.LogManager - Shutdown 
> complete.
> 2022-01-12T15:26:31.934Z [main] INFO  
> k.s.ClientQuotaManager$ThrottledChannelReaper - 
> [ThrottledChannelReaper-Fetch]: Shutting down
> 2022-01-12T15:26:32.311Z [main] INFO  
> k.s.ClientQuotaManager$ThrottledChannelReaper - 
> [ThrottledChannelReaper-Fetch]: Shutdown completed
> 2022-01-12T15:26:32.311Z [ThrottledChannelReaper-Fetch] INFO  
> k.s.ClientQuotaManager$ThrottledChannelReaper - 
> [ThrottledChannelReaper-Fetch]: Stopped
> 2022-01-12T15:26:32.311Z [main] INFO  
> k.s.ClientQuotaManager$ThrottledChannelReaper - 
> [ThrottledChannelReaper-Produce]: Shutting down
> 2022-01-12T15:26:33.312Z [ThrottledChannelReaper-Produce] INFO  
> k.s.ClientQuotaManager$ThrottledChannelReaper - 
> [ThrottledChannelReaper-Produce]: Stopped
> 2022-01-12T15:26:33.312Z [main] INFO  
> k.s.ClientQuotaManager$ThrottledChannelReaper - 
> [ThrottledChannelReaper-Produce]: Shutdown completed
> 2022-01-12T15:26:33.312Z [main] INFO  
> k.s.ClientQuotaManager$ThrottledChannelReaper - 
> [ThrottledChannelReaper-Request]: Shutting down
> 2022-01-12T15:26:34.315Z [ThrottledChannelReaper-Request] INFO  
> k.s.ClientQuotaManager$ThrottledChannelReaper - 
> [ThrottledChannelReaper-Request]: Stopped
> 2022-01-12T15:26:34.315Z [main] INFO  
> k.s.ClientQuotaManager$ThrottledChannelReaper - 
> [ThrottledChannelReaper-Request]: Shutdown completed
> 2022-01-12T15:26:34.315Z [main] INFO  
> k.s.ClientQuotaManager$ThrottledChannelReaper - 
> [ThrottledChannelReaper-ControllerMutation]: Shutting down
> 2022-01-12T15:26:35.317Z [main] INFO  
> k.s.ClientQuotaManager$ThrottledChannelReaper - 
> [ThrottledChannelReaper-ControllerMutation]: Shutdown completed
> 2022-01-12T15:26:35.317Z [ThrottledChannelReaper-ControllerMutation] INFO  
> k.s.ClientQuotaManager$ThrottledChannelReaper - 
> [ThrottledChannelReaper-ControllerMutation]: Stopped{code}
> Inspecting the code, the ThrottledChannelReaper threads are marked as not 
> interruptible, so ShutdownableThread does not interrupt the worker on 
> shutdown. Unfortunately, the doWork method polls with a 1 second timeout. So 
> you wait up to 1s for every type of quota, in this case for a total of almost 
> 4s.
>  
> While this is not a problem for production Kafka brokers, where instances are 
> expected to stay up for months, in tests that expect to spin up and down it 
> is easily noticed and adds real overhead to CI.
>  
> Suggested possible remediations:
>  * Allow ThrottledChannelReaper to be interrupted by ShutdownableThread
>  * ThrottledChannelReaper overrides initiateShutdown and after calling 
> {{super.initiateShutdown}} then enqueues a {{null}} element on the delayQueue 
> to force worker thread to notice shutdown state
>  * Reduce 1s poll timeout to something small (carries overhead penalty for 
> all users though, so this is less desirable), or make it configurable so we 
> can set it to e.g. 10ms in unit tests



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13593) ThrottledChannelReaper slows broker shutdown by multiple seconds

2022-01-12 Thread Steven Schlansker (Jira)
Steven Schlansker created KAFKA-13593:
-

 Summary: ThrottledChannelReaper slows broker shutdown by multiple 
seconds
 Key: KAFKA-13593
 URL: https://issues.apache.org/jira/browse/KAFKA-13593
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 3.0.0
Reporter: Steven Schlansker


We run an embedded KRaft broker in integration tests, to test that our Producer 
/ Consumers are all hooked up and verify our overall pipeline.

While trying to deliver CI speed improvements, we noticed that the majority of 
time for a small test is actually spent in Kafka broker shutdown. From the 
logs, it looks like the ClientQuotaManager has multiple ThrottledChannelReaper 
threads and each of them takes up to a second to shutdown.
{code:java}
2022-01-12T15:26:31.932Z [main] INFO  kafka.log.LogManager - Shutdown complete.
2022-01-12T15:26:31.934Z [main] INFO  
k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-Fetch]: 
Shutting down
2022-01-12T15:26:32.311Z [main] INFO  
k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-Fetch]: 
Shutdown completed
2022-01-12T15:26:32.311Z [ThrottledChannelReaper-Fetch] INFO  
k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-Fetch]: 
Stopped
2022-01-12T15:26:32.311Z [main] INFO  
k.s.ClientQuotaManager$ThrottledChannelReaper - 
[ThrottledChannelReaper-Produce]: Shutting down
2022-01-12T15:26:33.312Z [ThrottledChannelReaper-Produce] INFO  
k.s.ClientQuotaManager$ThrottledChannelReaper - 
[ThrottledChannelReaper-Produce]: Stopped
2022-01-12T15:26:33.312Z [main] INFO  
k.s.ClientQuotaManager$ThrottledChannelReaper - 
[ThrottledChannelReaper-Produce]: Shutdown completed
2022-01-12T15:26:33.312Z [main] INFO  
k.s.ClientQuotaManager$ThrottledChannelReaper - 
[ThrottledChannelReaper-Request]: Shutting down
2022-01-12T15:26:34.315Z [ThrottledChannelReaper-Request] INFO  
k.s.ClientQuotaManager$ThrottledChannelReaper - 
[ThrottledChannelReaper-Request]: Stopped
2022-01-12T15:26:34.315Z [main] INFO  
k.s.ClientQuotaManager$ThrottledChannelReaper - 
[ThrottledChannelReaper-Request]: Shutdown completed
2022-01-12T15:26:34.315Z [main] INFO  
k.s.ClientQuotaManager$ThrottledChannelReaper - 
[ThrottledChannelReaper-ControllerMutation]: Shutting down
2022-01-12T15:26:35.317Z [main] INFO  
k.s.ClientQuotaManager$ThrottledChannelReaper - 
[ThrottledChannelReaper-ControllerMutation]: Shutdown completed
2022-01-12T15:26:35.317Z [ThrottledChannelReaper-ControllerMutation] INFO  
k.s.ClientQuotaManager$ThrottledChannelReaper - 
[ThrottledChannelReaper-ControllerMutation]: Stopped{code}
Inspecting the code, the ThrottledChannelReaper threads are marked as not 
interruptible, so ShutdownableThread does not interrupt the worker on shutdown. 
Unfortunately, the doWork method polls with a 1 second timeout. So you wait up 
to 1s for every type of quota, in this case for a total of almost 4s.

 

While this is not a problem for production Kafka brokers, where instances are 
expected to stay up for months, in tests that expect to spin up and down it is 
easily noticed and adds real overhead to CI.

 

Suggested possible remediations:
 * Allow ThrottledChannelReaper to be interrupted by ShutdownableThread
 * ThrottledChannelReaper overrides initiateShutdown and after calling 
{{super.initiateShutdown}} then enqueues a {{null}} element on the delayQueue 
to force worker thread to notice shutdown state
 * Reduce 1s poll timeout to something small (carries overhead penalty for all 
users though, so this is less desirable), or make it configurable so we can set 
it to e.g. 10ms in unit tests



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-6767) OffsetCheckpoint write assumes parent directory exists

2018-04-12 Thread Steven Schlansker (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16436442#comment-16436442
 ] 

Steven Schlansker commented on KAFKA-6767:
--

If you expect it to be created as a precondition, then yes I agree, this seems 
to indicate that either there was an insufficient fix, race, or some other bug 
:(

 

> OffsetCheckpoint write assumes parent directory exists
> --
>
> Key: KAFKA-6767
> URL: https://issues.apache.org/jira/browse/KAFKA-6767
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Steven Schlansker
>Priority: Minor
>
> We run Kafka Streams with RocksDB state stores on ephemeral disks (i.e. if an 
> instance dies it is created from scratch, rather than reusing the existing 
> RocksDB.)
> We routinely see:
> {code:java}
> 2018-04-09T19:14:35.004Z WARN <> 
> [chat-0319e3c3-d8b2-4c60-bd69-a8484d8d4435-StreamThread-1] 
> o.a.k.s.p.i.ProcessorStateManager - task [0_11] Failed to write offset 
> checkpoint file to /mnt/mesos/sandbox/storage/chat/0_11/.checkpoint: {}
> java.io.FileNotFoundException: 
> /mnt/mesos/sandbox/storage/chat/0_11/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open0(Native Method)
> at java.io.FileOutputStream.open(FileOutputStream.java:270)
> at java.io.FileOutputStream.(FileOutputStream.java:213)
> at java.io.FileOutputStream.(FileOutputStream.java:162)
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:78)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:320)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:314)
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:307)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:297)
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67)
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:357)
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:347)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:403)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:994)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:811)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720){code}
> Inspecting the state store directory, I can indeed see that {{chat/0_11}} 
> does not exist (although many other partitions do).
>  
> Looking at the OffsetCheckpoint write method, it seems to try to open a new 
> checkpoint file without first ensuring that the parent directory exists.
>  
> {code:java}
>     public void write(final Map offsets) throws 
> IOException {
>     // if there is no offsets, skip writing the file to save disk IOs
>     if (offsets.isEmpty()) {
>     return;
>     }
>     synchronized (lock) {
>     // write to temp file and then swap with the existing file
>     final File temp = new File(file.getAbsolutePath() + ".tmp");{code}
>  
> Either the OffsetCheckpoint class should initialize the directories if 
> needed, or some precondition of it being called should ensure that is the 
> case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6767) OffsetCheckpoint write assumes parent directory exists

2018-04-09 Thread Steven Schlansker (JIRA)
Steven Schlansker created KAFKA-6767:


 Summary: OffsetCheckpoint write assumes parent directory exists
 Key: KAFKA-6767
 URL: https://issues.apache.org/jira/browse/KAFKA-6767
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.1.0
Reporter: Steven Schlansker


We run Kafka Streams with RocksDB state stores on ephemeral disks (i.e. if an 
instance dies it is created from scratch, rather than reusing the existing 
RocksDB.)

We routinely see:
{code:java}
2018-04-09T19:14:35.004Z WARN <> 
[chat-0319e3c3-d8b2-4c60-bd69-a8484d8d4435-StreamThread-1] 
o.a.k.s.p.i.ProcessorStateManager - task [0_11] Failed to write offset 
checkpoint file to /mnt/mesos/sandbox/storage/chat/0_11/.checkpoint: {}
java.io.FileNotFoundException: 
/mnt/mesos/sandbox/storage/chat/0_11/.checkpoint.tmp (No such file or directory)
at java.io.FileOutputStream.open0(Native Method)
at java.io.FileOutputStream.open(FileOutputStream.java:270)
at java.io.FileOutputStream.(FileOutputStream.java:213)
at java.io.FileOutputStream.(FileOutputStream.java:162)
at 
org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:78)
at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:320)
at 
org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:314)
at 
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:307)
at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:297)
at 
org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67)
at 
org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:357)
at 
org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:347)
at 
org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:403)
at 
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:994)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:811)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720){code}
Inspecting the state store directory, I can indeed see that {{chat/0_11}} does 
not exist (although many other partitions do).

 

Looking at the OffsetCheckpoint write method, it seems to try to open a new 
checkpoint file without first ensuring that the parent directory exists.

 
{code:java}
    public void write(final Map offsets) throws 
IOException {
    // if there is no offsets, skip writing the file to save disk IOs
    if (offsets.isEmpty()) {
    return;
    }

    synchronized (lock) {
    // write to temp file and then swap with the existing file
    final File temp = new File(file.getAbsolutePath() + ".tmp");{code}
 

Either the OffsetCheckpoint class should initialize the directories if needed, 
or some precondition of it being called should ensure that is the case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4787) KafkaStreams close() is not reentrant

2017-08-29 Thread Steven Schlansker (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4787?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16145979#comment-16145979
 ] 

Steven Schlansker commented on KAFKA-4787:
--

Thanks for following up.  I inspected the code as of 0.11.0.0 and it looks like 
it's much better now (joining happens on a background thread).  Will report 
back if I observe further deadlocks.

> KafkaStreams close() is not reentrant
> -
>
> Key: KAFKA-4787
> URL: https://issues.apache.org/jira/browse/KAFKA-4787
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Steven Schlansker
>
> While building a simple application, I tried to implement a failure policy 
> where any uncaught exception terminates the application until an 
> administrator can evaluate and intervene:
> {code}
> /** Handle any uncaught exception by shutting down the program. */
> private void handleStreamException(Thread thread, Throwable t) {
> LOG.error("stream exception in thread {}", thread, t);
> streams.close();
> }
> streams.setUncaughtExceptionHandler(this::handleStreamException);
> streams.start();
> {code}
> Unfortunately, because the KafkaStreams#close() method takes a lock, this is 
> prone to what looks like a deadlock:
> {code}
> "StreamThread-1" #80 prio=5 os_prio=0 tid=0x7f56096f4000 nid=0x40c8 
> waiting for monitor entry [0x7f54f03ee000]
>java.lang.Thread.State: BLOCKED (on object monitor)
> at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java)
> - waiting to lock <0xf171cda8> (a 
> org.apache.kafka.streams.KafkaStreams)
> at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:438)
> at 
> com.opentable.chat.service.ChatStorage$$Lambda$161/1940967023.close(Unknown 
> Source)
> at 
> com.opentable.chat.service.ChatStorage.closeLog(ChatStorage.java:212)
> at com.opentable.chat.service.ChatStorage.close(ChatStorage.java:207)
> at 
> com.opentable.chat.service.ChatStorage.handleStreamException(ChatStorage.java:541)
> at 
> com.opentable.chat.service.ChatStorage$$Lambda$123/149062221.uncaughtException(Unknown
>  Source)
> at java.lang.Thread.dispatchUncaughtException(Thread.java:1956)
> "main" #1 prio=5 os_prio=0 tid=0x7f5608011000 nid=0x3f76 in Object.wait() 
> [0x7f5610f04000]
>java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.Thread.join(Thread.java:1249)
> - locked <0xfd302bf0> (a java.lang.Thread)
> at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:494)
> - locked <0xf171cda8> (a 
> org.apache.kafka.streams.KafkaStreams)
> at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:438)
> at 
> com.opentable.chat.service.ChatStorage$$Lambda$161/1940967023.close(Unknown 
> Source)
> at 
> com.opentable.chat.service.ChatStorage.closeLog(ChatStorage.java:212)
> at com.opentable.chat.service.ChatStorage.close(ChatStorage.java:207)
> {code}
> Note how the main thread calls close(), which encounters an exception.  It 
> uses a StreamThread to dispatch to the handler, which calls close().  Once it 
> tries to take the monitor, we are left in a position where main is joined on 
> StreamThread-1, but StreamThread-1 is waiting for main to release that 
> monitor.
> Arguably it's a bit abusive to call close() in this way (it certainly wasn't 
> intentional) -- but to make Kafka Streams robust it should handle any 
> sequence of close() invocations in particular gracefully.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)