[jira] [Resolved] (KAFKA-14942) CopyOnWriteMap implements ConcurrentMap but does not implement required default methods
[ https://issues.apache.org/jira/browse/KAFKA-14942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Schlansker resolved KAFKA-14942. --- Resolution: Invalid > CopyOnWriteMap implements ConcurrentMap but does not implement required > default methods > --- > > Key: KAFKA-14942 > URL: https://issues.apache.org/jira/browse/KAFKA-14942 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.4.0 >Reporter: Steven Schlansker >Priority: Minor > > Hi Kafka team, > I was reading through the kafka-clients CopyOnWriteMap while investigating a > problem in a different library, and I think it is declaring that it is a > ConcurrentMap but does not completely implement that interface. > In particular, it inherits e.g. computeIfAbsent as a default method from Map, > which is noted to be a non-atomic implementation, and is not synchronized in > any way. I think this can lead to a reader experiencing a map whose contents > are not consistent with any serial execution of write ops. > > Consider a thread T1 which calls computeIfAbsent("a", _ -> "1") > T1 computeIfAbsent calls get("a") and observes null, and is then pre-empted > T2 calls put("a", "2"), which copies the (empty) backing map and stores > \{"a": "2"} > T1 computeIfAbsent then wakes up, still thinking the value is null, and calls > put("a", "1"). > > This leads to the map finishing with the contents \{"a":"1"}, while any > serial execution of these two operations should always finish with \{"a":"2"}. > > I think CopyOnWriteMap should either re-implement all mutating default > methods at least as synchronized. If this is a special internal map and we > know those will never be called, perhaps they should throw > UnsupportedOperationException or at least document the class as not a > complete and proper implementation. > > Thank you for your consideration. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14942) CopyOnWriteMap implements ConcurrentMap but does not implement required default methods
Steven Schlansker created KAFKA-14942: - Summary: CopyOnWriteMap implements ConcurrentMap but does not implement required default methods Key: KAFKA-14942 URL: https://issues.apache.org/jira/browse/KAFKA-14942 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 3.4.0 Reporter: Steven Schlansker Hi Kafka team, I was reading through the kafka-clients CopyOnWriteMap while investigating a problem in a different library, and I think it is declaring that it is a ConcurrentMap but does not completely implement that interface. In particular, it inherits e.g. computeIfAbsent as a default method from Map, which is noted to be a non-atomic implementation, and is not synchronized in any way. I think this can lead to a reader experiencing a map whose contents are not consistent with any serial execution of write ops. Consider a thread T1 which calls computeIfAbsent("a", _ -> "1") T1 computeIfAbsent calls get("a") and observes null, and is then pre-empted T2 calls put("a", "2"), which copies the (empty) backing map and stores \{"a": "2"} T1 computeIfAbsent then wakes up, still thinking the value is null, and calls put("a", "1"). This leads to the map finishing with the contents \{"a":"1"}, while any serial execution of these two operations should always finish with \{"a":"2"}. I think CopyOnWriteMap should either re-implement all mutating default methods at least as synchronized. If this is a special internal map and we know those will never be called, perhaps they should throw UnsupportedOperationException or at least document the class as not a complete and proper implementation. Thank you for your consideration. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-13593) ThrottledChannelReaper slows broker shutdown by multiple seconds
[ https://issues.apache.org/jira/browse/KAFKA-13593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Schlansker resolved KAFKA-13593. --- Resolution: Fixed > ThrottledChannelReaper slows broker shutdown by multiple seconds > > > Key: KAFKA-13593 > URL: https://issues.apache.org/jira/browse/KAFKA-13593 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 3.0.0 >Reporter: Steven Schlansker >Priority: Minor > > We run an embedded KRaft broker in integration tests, to test that our > Producer / Consumers are all hooked up and verify our overall pipeline. > While trying to deliver CI speed improvements, we noticed that the majority > of time for a small test is actually spent in Kafka broker shutdown. From the > logs, it looks like the ClientQuotaManager has multiple > ThrottledChannelReaper threads and each of them takes up to a second to > shutdown. > {code:java} > 2022-01-12T15:26:31.932Z [main] INFO kafka.log.LogManager - Shutdown > complete. > 2022-01-12T15:26:31.934Z [main] INFO > k.s.ClientQuotaManager$ThrottledChannelReaper - > [ThrottledChannelReaper-Fetch]: Shutting down > 2022-01-12T15:26:32.311Z [main] INFO > k.s.ClientQuotaManager$ThrottledChannelReaper - > [ThrottledChannelReaper-Fetch]: Shutdown completed > 2022-01-12T15:26:32.311Z [ThrottledChannelReaper-Fetch] INFO > k.s.ClientQuotaManager$ThrottledChannelReaper - > [ThrottledChannelReaper-Fetch]: Stopped > 2022-01-12T15:26:32.311Z [main] INFO > k.s.ClientQuotaManager$ThrottledChannelReaper - > [ThrottledChannelReaper-Produce]: Shutting down > 2022-01-12T15:26:33.312Z [ThrottledChannelReaper-Produce] INFO > k.s.ClientQuotaManager$ThrottledChannelReaper - > [ThrottledChannelReaper-Produce]: Stopped > 2022-01-12T15:26:33.312Z [main] INFO > k.s.ClientQuotaManager$ThrottledChannelReaper - > [ThrottledChannelReaper-Produce]: Shutdown completed > 2022-01-12T15:26:33.312Z [main] INFO > k.s.ClientQuotaManager$ThrottledChannelReaper - > [ThrottledChannelReaper-Request]: Shutting down > 2022-01-12T15:26:34.315Z [ThrottledChannelReaper-Request] INFO > k.s.ClientQuotaManager$ThrottledChannelReaper - > [ThrottledChannelReaper-Request]: Stopped > 2022-01-12T15:26:34.315Z [main] INFO > k.s.ClientQuotaManager$ThrottledChannelReaper - > [ThrottledChannelReaper-Request]: Shutdown completed > 2022-01-12T15:26:34.315Z [main] INFO > k.s.ClientQuotaManager$ThrottledChannelReaper - > [ThrottledChannelReaper-ControllerMutation]: Shutting down > 2022-01-12T15:26:35.317Z [main] INFO > k.s.ClientQuotaManager$ThrottledChannelReaper - > [ThrottledChannelReaper-ControllerMutation]: Shutdown completed > 2022-01-12T15:26:35.317Z [ThrottledChannelReaper-ControllerMutation] INFO > k.s.ClientQuotaManager$ThrottledChannelReaper - > [ThrottledChannelReaper-ControllerMutation]: Stopped{code} > Inspecting the code, the ThrottledChannelReaper threads are marked as not > interruptible, so ShutdownableThread does not interrupt the worker on > shutdown. Unfortunately, the doWork method polls with a 1 second timeout. So > you wait up to 1s for every type of quota, in this case for a total of almost > 4s. > > While this is not a problem for production Kafka brokers, where instances are > expected to stay up for months, in tests that expect to spin up and down it > is easily noticed and adds real overhead to CI. > > Suggested possible remediations: > * Allow ThrottledChannelReaper to be interrupted by ShutdownableThread > * ThrottledChannelReaper overrides initiateShutdown and after calling > {{super.initiateShutdown}} then enqueues a {{null}} element on the delayQueue > to force worker thread to notice shutdown state > * Reduce 1s poll timeout to something small (carries overhead penalty for > all users though, so this is less desirable), or make it configurable so we > can set it to e.g. 10ms in unit tests -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13593) ThrottledChannelReaper slows broker shutdown by multiple seconds
Steven Schlansker created KAFKA-13593: - Summary: ThrottledChannelReaper slows broker shutdown by multiple seconds Key: KAFKA-13593 URL: https://issues.apache.org/jira/browse/KAFKA-13593 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 3.0.0 Reporter: Steven Schlansker We run an embedded KRaft broker in integration tests, to test that our Producer / Consumers are all hooked up and verify our overall pipeline. While trying to deliver CI speed improvements, we noticed that the majority of time for a small test is actually spent in Kafka broker shutdown. From the logs, it looks like the ClientQuotaManager has multiple ThrottledChannelReaper threads and each of them takes up to a second to shutdown. {code:java} 2022-01-12T15:26:31.932Z [main] INFO kafka.log.LogManager - Shutdown complete. 2022-01-12T15:26:31.934Z [main] INFO k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-Fetch]: Shutting down 2022-01-12T15:26:32.311Z [main] INFO k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-Fetch]: Shutdown completed 2022-01-12T15:26:32.311Z [ThrottledChannelReaper-Fetch] INFO k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-Fetch]: Stopped 2022-01-12T15:26:32.311Z [main] INFO k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-Produce]: Shutting down 2022-01-12T15:26:33.312Z [ThrottledChannelReaper-Produce] INFO k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-Produce]: Stopped 2022-01-12T15:26:33.312Z [main] INFO k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-Produce]: Shutdown completed 2022-01-12T15:26:33.312Z [main] INFO k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-Request]: Shutting down 2022-01-12T15:26:34.315Z [ThrottledChannelReaper-Request] INFO k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-Request]: Stopped 2022-01-12T15:26:34.315Z [main] INFO k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-Request]: Shutdown completed 2022-01-12T15:26:34.315Z [main] INFO k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-ControllerMutation]: Shutting down 2022-01-12T15:26:35.317Z [main] INFO k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-ControllerMutation]: Shutdown completed 2022-01-12T15:26:35.317Z [ThrottledChannelReaper-ControllerMutation] INFO k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-ControllerMutation]: Stopped{code} Inspecting the code, the ThrottledChannelReaper threads are marked as not interruptible, so ShutdownableThread does not interrupt the worker on shutdown. Unfortunately, the doWork method polls with a 1 second timeout. So you wait up to 1s for every type of quota, in this case for a total of almost 4s. While this is not a problem for production Kafka brokers, where instances are expected to stay up for months, in tests that expect to spin up and down it is easily noticed and adds real overhead to CI. Suggested possible remediations: * Allow ThrottledChannelReaper to be interrupted by ShutdownableThread * ThrottledChannelReaper overrides initiateShutdown and after calling {{super.initiateShutdown}} then enqueues a {{null}} element on the delayQueue to force worker thread to notice shutdown state * Reduce 1s poll timeout to something small (carries overhead penalty for all users though, so this is less desirable), or make it configurable so we can set it to e.g. 10ms in unit tests -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-6767) OffsetCheckpoint write assumes parent directory exists
Steven Schlansker created KAFKA-6767: Summary: OffsetCheckpoint write assumes parent directory exists Key: KAFKA-6767 URL: https://issues.apache.org/jira/browse/KAFKA-6767 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 1.1.0 Reporter: Steven Schlansker We run Kafka Streams with RocksDB state stores on ephemeral disks (i.e. if an instance dies it is created from scratch, rather than reusing the existing RocksDB.) We routinely see: {code:java} 2018-04-09T19:14:35.004Z WARN <> [chat-0319e3c3-d8b2-4c60-bd69-a8484d8d4435-StreamThread-1] o.a.k.s.p.i.ProcessorStateManager - task [0_11] Failed to write offset checkpoint file to /mnt/mesos/sandbox/storage/chat/0_11/.checkpoint: {} java.io.FileNotFoundException: /mnt/mesos/sandbox/storage/chat/0_11/.checkpoint.tmp (No such file or directory) at java.io.FileOutputStream.open0(Native Method) at java.io.FileOutputStream.open(FileOutputStream.java:270) at java.io.FileOutputStream.(FileOutputStream.java:213) at java.io.FileOutputStream.(FileOutputStream.java:162) at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:78) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:320) at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:314) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:307) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:297) at org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67) at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:357) at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:347) at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:403) at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:994) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:811) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720){code} Inspecting the state store directory, I can indeed see that {{chat/0_11}} does not exist (although many other partitions do). Looking at the OffsetCheckpoint write method, it seems to try to open a new checkpoint file without first ensuring that the parent directory exists. {code:java} public void write(final Mapoffsets) throws IOException { // if there is no offsets, skip writing the file to save disk IOs if (offsets.isEmpty()) { return; } synchronized (lock) { // write to temp file and then swap with the existing file final File temp = new File(file.getAbsolutePath() + ".tmp");{code} Either the OffsetCheckpoint class should initialize the directories if needed, or some precondition of it being called should ensure that is the case. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4829) Improve logging of StreamTask commits
[ https://issues.apache.org/jira/browse/KAFKA-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16050793#comment-16050793 ] Steven Schlansker commented on KAFKA-4829: -- This is talking about application logs; the link seems to refer to database logs, not at all the same thing. Or am I missing something? > Improve logging of StreamTask commits > - > > Key: KAFKA-4829 > URL: https://issues.apache.org/jira/browse/KAFKA-4829 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Steven Schlansker >Priority: Minor > Labels: user-experience > > Currently I see this every commit interval: > {code} > 2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] > o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Committing > task StreamTask 1_31 > 2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] > o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Committing > task StreamTask 2_31 > {code} > We have ~10 tasks in our topology, 4 topics, and 32 partitions per topic. > This means every commit interval we log a few hundred lines of the above > which is an order of magnitude chattier than anything else in the log > during normal operations. > To improve visibility of important messages, we should reduce the chattiness > of normal commits and highlight abnormal commits. An example proposal: > existing message is fine at TRACE level for diagnostics > {{TRACE o.a.k.s.p.i.StreamThread - Committing task StreamTask 1_31}} > normal fast case, wrap them all up into one summary line > {{INFO o.a.k.s.p.i.StreamThreads - 64 stream tasks committed in 25ms}} > some kind of threshold / messaging in case it doesn't complete quickly or > logs an exception > {{ERROR o.a.k.s.p.i.StreamThread - StreamTask 1_32 did not commit in 100ms}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-4829) Improve logging of StreamTask commits
[ https://issues.apache.org/jira/browse/KAFKA-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16050793#comment-16050793 ] Steven Schlansker edited comment on KAFKA-4829 at 6/15/17 5:10 PM: --- This ticket is talking about application logs; the link seems to refer to database logs, not at all the same thing. Or am I missing something? was (Author: stevenschlansker): This is talking about application logs; the link seems to refer to database logs, not at all the same thing. Or am I missing something? > Improve logging of StreamTask commits > - > > Key: KAFKA-4829 > URL: https://issues.apache.org/jira/browse/KAFKA-4829 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Steven Schlansker >Priority: Minor > Labels: user-experience > > Currently I see this every commit interval: > {code} > 2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] > o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Committing > task StreamTask 1_31 > 2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] > o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Committing > task StreamTask 2_31 > {code} > We have ~10 tasks in our topology, 4 topics, and 32 partitions per topic. > This means every commit interval we log a few hundred lines of the above > which is an order of magnitude chattier than anything else in the log > during normal operations. > To improve visibility of important messages, we should reduce the chattiness > of normal commits and highlight abnormal commits. An example proposal: > existing message is fine at TRACE level for diagnostics > {{TRACE o.a.k.s.p.i.StreamThread - Committing task StreamTask 1_31}} > normal fast case, wrap them all up into one summary line > {{INFO o.a.k.s.p.i.StreamThreads - 64 stream tasks committed in 25ms}} > some kind of threshold / messaging in case it doesn't complete quickly or > logs an exception > {{ERROR o.a.k.s.p.i.StreamThread - StreamTask 1_32 did not commit in 100ms}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-1313) Support adding replicas to existing topic partitions via kafka-topics tool without manually setting broker assignments
[ https://issues.apache.org/jira/browse/KAFKA-1313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15993414#comment-15993414 ] Steven Schlansker commented on KAFKA-1313: -- I started using Kafka Streams recently. I did not know to configure the {{replication.factor}} tuneable and so now all of my automatically generated topics have the wrong replication factor. I tried to update via {{kafka-topics.sh}} and obviously ended up here. I understand why this got deprioritized, but consider now that in addition to an administrator creating topics (where they have an opportunity to set replication factor right), Kafka Streams creates topics behind your back and you may not realize your replication factor is wrong until you have a lot of existing data. I can obviously fix it up by hand as outlined above but this is a pretty big wart and seems that it should be well worth fixing. > Support adding replicas to existing topic partitions via kafka-topics tool > without manually setting broker assignments > -- > > Key: KAFKA-1313 > URL: https://issues.apache.org/jira/browse/KAFKA-1313 > Project: Kafka > Issue Type: New Feature > Components: tools >Affects Versions: 0.8.0 >Reporter: Marc Labbe >Assignee: Sreepathi Prasanna > Labels: newbie++ > > There is currently no easy way to add replicas to an existing topic > partitions. > For example, topic create-test has been created with ReplicationFactor=1: > Topic:create-test PartitionCount:3ReplicationFactor:1 Configs: > Topic: create-test Partition: 0Leader: 1 Replicas: 1 Isr: 1 > Topic: create-test Partition: 1Leader: 2 Replicas: 2 Isr: 2 > Topic: create-test Partition: 2Leader: 3 Replicas: 3 Isr: 3 > I would like to increase the ReplicationFactor=2 (or more) so it shows up > like this instead. > Topic:create-test PartitionCount:3ReplicationFactor:2 Configs: > Topic: create-test Partition: 0Leader: 1 Replicas: 1,2 Isr: 1,2 > Topic: create-test Partition: 1Leader: 2 Replicas: 2,3 Isr: 2,3 > Topic: create-test Partition: 2Leader: 3 Replicas: 3,1 Isr: 3,1 > Use cases for this: > - adding brokers and thus increase fault tolerance > - fixing human errors for topics created with wrong values -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (KAFKA-4829) Improve logging of StreamTask commits
[ https://issues.apache.org/jira/browse/KAFKA-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Schlansker updated KAFKA-4829: - Description: Currently I see this every commit interval: {code} 2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Committing task StreamTask 1_31 2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Committing task StreamTask 2_31 {code} We have ~10 tasks in our topology, 4 topics, and 32 partitions per topic. This means every commit interval we log a few hundred lines of the above which is an order of magnitude chattier than anything else in the log during normal operations. To improve visibility of important messages, we should reduce the chattiness of normal commits and highlight abnormal commits. An example proposal: existing message is fine at TRACE level for diagnostics {{TRACE o.a.k.s.p.i.StreamThread - Committing task StreamTask 1_31}} normal fast case, wrap them all up into one summary line {{INFO o.a.k.s.p.i.StreamThreads - 64 stream tasks committed in 25ms}} some kind of threshold / messaging in case it doesn't complete quickly or logs an exception {{ERROR o.a.k.s.p.i.StreamThread - StreamTask 1_32 did not commit in 100ms}} was: Currently I see this every commit interval: {code} 2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Committing task StreamTask 1_31 2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Committing task StreamTask 2_31 {code} We have ~10 tasks in our topology, 4 topics, and 32 partitions per topic. This means every commit interval we log a few hundred lines of the above which is an order of magnitude chattier than anything else in the log during normal operations. To improve visibility of important messages, we should reduce the chattiness of normal commits and highlight abnormal commits. An example proposal: existing message is fine at TRACE level for diagnostics {{TRACE o.a.k.s.p.i.StreamThread - Committing task StreamTask 1_31}} normal fast case, wrap them all up into one summary line {{INFO o.a.k.s.p.i.StreamThreads - 64 stream tasks committed in 25ms}} some kind of threshold / messaging in case it doesn't complete quickly or logs an exception {{ERROR o.a.k.s.p.i.StreamThread - StreamTask 1_32 did not commit in 100ms}} Thoughts? > Improve logging of StreamTask commits > - > > Key: KAFKA-4829 > URL: https://issues.apache.org/jira/browse/KAFKA-4829 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Steven Schlansker >Priority: Minor > > Currently I see this every commit interval: > {code} > 2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] > o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Committing > task StreamTask 1_31 > 2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] > o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Committing > task StreamTask 2_31 > {code} > We have ~10 tasks in our topology, 4 topics, and 32 partitions per topic. > This means every commit interval we log a few hundred lines of the above > which is an order of magnitude chattier than anything else in the log > during normal operations. > To improve visibility of important messages, we should reduce the chattiness > of normal commits and highlight abnormal commits. An example proposal: > existing message is fine at TRACE level for diagnostics > {{TRACE o.a.k.s.p.i.StreamThread - Committing task StreamTask 1_31}} > normal fast case, wrap them all up into one summary line > {{INFO o.a.k.s.p.i.StreamThreads - 64 stream tasks committed in 25ms}} > some kind of threshold / messaging in case it doesn't complete quickly or > logs an exception > {{ERROR o.a.k.s.p.i.StreamThread - StreamTask 1_32 did not commit in 100ms}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-4829) Improve logging of StreamTask commits
Steven Schlansker created KAFKA-4829: Summary: Improve logging of StreamTask commits Key: KAFKA-4829 URL: https://issues.apache.org/jira/browse/KAFKA-4829 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 0.10.2.0 Reporter: Steven Schlansker Priority: Minor Currently I see this every commit interval: 2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Committing task StreamTask 1_31 2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Committing task StreamTask 2_31 We have ~10 tasks in our topology, 4 topics, and 32 partitions per topic. This means every commit interval we log a few hundred lines of the above which is an order of magnitude chattier than anything else in the log during normal operations. To improve visibility of important messages, we should reduce the chattiness of normal commits and highlight abnormal commits. An example proposal: existing message is fine at TRACE level for diagnostics {{TRACE o.a.k.s.p.i.StreamThread - Committing task StreamTask 1_31}} normal fast case, wrap them all up into one summary line {{INFO o.a.k.s.p.i.StreamThreads - 64 stream tasks committed in 25ms}} some kind of threshold / messaging in case it doesn't complete quickly or logs an exception {{ERROR o.a.k.s.p.i.StreamThread - StreamTask 1_32 did not commit in 100ms}} Thoughts? -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (KAFKA-4829) Improve logging of StreamTask commits
[ https://issues.apache.org/jira/browse/KAFKA-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Schlansker updated KAFKA-4829: - Description: Currently I see this every commit interval: {code} 2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Committing task StreamTask 1_31 2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Committing task StreamTask 2_31 {code} We have ~10 tasks in our topology, 4 topics, and 32 partitions per topic. This means every commit interval we log a few hundred lines of the above which is an order of magnitude chattier than anything else in the log during normal operations. To improve visibility of important messages, we should reduce the chattiness of normal commits and highlight abnormal commits. An example proposal: existing message is fine at TRACE level for diagnostics {{TRACE o.a.k.s.p.i.StreamThread - Committing task StreamTask 1_31}} normal fast case, wrap them all up into one summary line {{INFO o.a.k.s.p.i.StreamThreads - 64 stream tasks committed in 25ms}} some kind of threshold / messaging in case it doesn't complete quickly or logs an exception {{ERROR o.a.k.s.p.i.StreamThread - StreamTask 1_32 did not commit in 100ms}} Thoughts? was: Currently I see this every commit interval: 2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Committing task StreamTask 1_31 2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Committing task StreamTask 2_31 We have ~10 tasks in our topology, 4 topics, and 32 partitions per topic. This means every commit interval we log a few hundred lines of the above which is an order of magnitude chattier than anything else in the log during normal operations. To improve visibility of important messages, we should reduce the chattiness of normal commits and highlight abnormal commits. An example proposal: existing message is fine at TRACE level for diagnostics {{TRACE o.a.k.s.p.i.StreamThread - Committing task StreamTask 1_31}} normal fast case, wrap them all up into one summary line {{INFO o.a.k.s.p.i.StreamThreads - 64 stream tasks committed in 25ms}} some kind of threshold / messaging in case it doesn't complete quickly or logs an exception {{ERROR o.a.k.s.p.i.StreamThread - StreamTask 1_32 did not commit in 100ms}} Thoughts? > Improve logging of StreamTask commits > - > > Key: KAFKA-4829 > URL: https://issues.apache.org/jira/browse/KAFKA-4829 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Steven Schlansker >Priority: Minor > > Currently I see this every commit interval: > {code} > 2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] > o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Committing > task StreamTask 1_31 > 2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] > o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Committing > task StreamTask 2_31 > {code} > We have ~10 tasks in our topology, 4 topics, and 32 partitions per topic. > This means every commit interval we log a few hundred lines of the above > which is an order of magnitude chattier than anything else in the log > during normal operations. > To improve visibility of important messages, we should reduce the chattiness > of normal commits and highlight abnormal commits. An example proposal: > existing message is fine at TRACE level for diagnostics > {{TRACE o.a.k.s.p.i.StreamThread - Committing task StreamTask 1_31}} > normal fast case, wrap them all up into one summary line > {{INFO o.a.k.s.p.i.StreamThreads - 64 stream tasks committed in 25ms}} > some kind of threshold / messaging in case it doesn't complete quickly > or logs an exception > {{ERROR o.a.k.s.p.i.StreamThread - StreamTask 1_32 did not commit in 100ms}} > Thoughts? -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4787) KafkaStreams close() is not reentrant
[ https://issues.apache.org/jira/browse/KAFKA-4787?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15879336#comment-15879336 ] Steven Schlansker commented on KAFKA-4787: -- I'm not sure I understand why that should behave differently. Replacing the lambda expression ({{this::handleStreamException}}) with your anonymous subclass implementation should be equivalent with regards to taking the {{synchronized}} monitor of the KafkaStreams object inside the {{public synchronized void close(long, TimeUnit)}} method. I will test it anyway though and see if it in fact does behave differently. > KafkaStreams close() is not reentrant > - > > Key: KAFKA-4787 > URL: https://issues.apache.org/jira/browse/KAFKA-4787 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Steven Schlansker > > While building a simple application, I tried to implement a failure policy > where any uncaught exception terminates the application until an > administrator can evaluate and intervene: > {code} > /** Handle any uncaught exception by shutting down the program. */ > private void handleStreamException(Thread thread, Throwable t) { > LOG.error("stream exception in thread {}", thread, t); > streams.close(); > } > streams.setUncaughtExceptionHandler(this::handleStreamException); > streams.start(); > {code} > Unfortunately, because the KafkaStreams#close() method takes a lock, this is > prone to what looks like a deadlock: > {code} > "StreamThread-1" #80 prio=5 os_prio=0 tid=0x7f56096f4000 nid=0x40c8 > waiting for monitor entry [0x7f54f03ee000] >java.lang.Thread.State: BLOCKED (on object monitor) > at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java) > - waiting to lock <0xf171cda8> (a > org.apache.kafka.streams.KafkaStreams) > at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:438) > at > com.opentable.chat.service.ChatStorage$$Lambda$161/1940967023.close(Unknown > Source) > at > com.opentable.chat.service.ChatStorage.closeLog(ChatStorage.java:212) > at com.opentable.chat.service.ChatStorage.close(ChatStorage.java:207) > at > com.opentable.chat.service.ChatStorage.handleStreamException(ChatStorage.java:541) > at > com.opentable.chat.service.ChatStorage$$Lambda$123/149062221.uncaughtException(Unknown > Source) > at java.lang.Thread.dispatchUncaughtException(Thread.java:1956) > "main" #1 prio=5 os_prio=0 tid=0x7f5608011000 nid=0x3f76 in Object.wait() > [0x7f5610f04000] >java.lang.Thread.State: WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > at java.lang.Thread.join(Thread.java:1249) > - locked <0xfd302bf0> (a java.lang.Thread) > at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:494) > - locked <0xf171cda8> (a > org.apache.kafka.streams.KafkaStreams) > at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:438) > at > com.opentable.chat.service.ChatStorage$$Lambda$161/1940967023.close(Unknown > Source) > at > com.opentable.chat.service.ChatStorage.closeLog(ChatStorage.java:212) > at com.opentable.chat.service.ChatStorage.close(ChatStorage.java:207) > {code} > Note how the main thread calls close(), which encounters an exception. It > uses a StreamThread to dispatch to the handler, which calls close(). Once it > tries to take the monitor, we are left in a position where main is joined on > StreamThread-1, but StreamThread-1 is waiting for main to release that > monitor. > Arguably it's a bit abusive to call close() in this way (it certainly wasn't > intentional) -- but to make Kafka Streams robust it should handle any > sequence of close() invocations in particular gracefully. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4787) KafkaStreams close() is not reentrant
[ https://issues.apache.org/jira/browse/KAFKA-4787?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15879019#comment-15879019 ] Steven Schlansker commented on KAFKA-4787: -- A simple solution could simply have further close() calls not call the inner synchronized close overload but instead join on the already existing close request > KafkaStreams close() is not reentrant > - > > Key: KAFKA-4787 > URL: https://issues.apache.org/jira/browse/KAFKA-4787 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Steven Schlansker > > While building a simple application, I tried to implement a failure policy > where any uncaught exception terminates the application until an > administrator can evaluate and intervene: > {code} > /** Handle any uncaught exception by shutting down the program. */ > private void handleStreamException(Thread thread, Throwable t) { > LOG.error("stream exception in thread {}", thread, t); > streams.close(); > } > streams.setUncaughtExceptionHandler(this::handleStreamException); > streams.start(); > {code} > Unfortunately, because the KafkaStreams#close() method takes a lock, this is > prone to what looks like a deadlock: > {code} > "StreamThread-1" #80 prio=5 os_prio=0 tid=0x7f56096f4000 nid=0x40c8 > waiting for monitor entry [0x7f54f03ee000] >java.lang.Thread.State: BLOCKED (on object monitor) > at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java) > - waiting to lock <0xf171cda8> (a > org.apache.kafka.streams.KafkaStreams) > at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:438) > at > com.opentable.chat.service.ChatStorage$$Lambda$161/1940967023.close(Unknown > Source) > at > com.opentable.chat.service.ChatStorage.closeLog(ChatStorage.java:212) > at com.opentable.chat.service.ChatStorage.close(ChatStorage.java:207) > at > com.opentable.chat.service.ChatStorage.handleStreamException(ChatStorage.java:541) > at > com.opentable.chat.service.ChatStorage$$Lambda$123/149062221.uncaughtException(Unknown > Source) > at java.lang.Thread.dispatchUncaughtException(Thread.java:1956) > "main" #1 prio=5 os_prio=0 tid=0x7f5608011000 nid=0x3f76 in Object.wait() > [0x7f5610f04000] >java.lang.Thread.State: WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > at java.lang.Thread.join(Thread.java:1249) > - locked <0xfd302bf0> (a java.lang.Thread) > at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:494) > - locked <0xf171cda8> (a > org.apache.kafka.streams.KafkaStreams) > at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:438) > at > com.opentable.chat.service.ChatStorage$$Lambda$161/1940967023.close(Unknown > Source) > at > com.opentable.chat.service.ChatStorage.closeLog(ChatStorage.java:212) > at com.opentable.chat.service.ChatStorage.close(ChatStorage.java:207) > {code} > Note how the main thread calls close(), which encounters an exception. It > uses a StreamThread to dispatch to the handler, which calls close(). Once it > tries to take the monitor, we are left in a position where main is joined on > StreamThread-1, but StreamThread-1 is waiting for main to release that > monitor. > Arguably it's a bit abusive to call close() in this way (it certainly wasn't > intentional) -- but to make Kafka Streams robust it should handle any > sequence of close() invocations in particular gracefully. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-4787) KafkaStreams close() is not reentrant
Steven Schlansker created KAFKA-4787: Summary: KafkaStreams close() is not reentrant Key: KAFKA-4787 URL: https://issues.apache.org/jira/browse/KAFKA-4787 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 0.10.2.0 Reporter: Steven Schlansker While building a simple application, I tried to implement a failure policy where any uncaught exception terminates the application until an administrator can evaluate and intervene: {code} /** Handle any uncaught exception by shutting down the program. */ private void handleStreamException(Thread thread, Throwable t) { LOG.error("stream exception in thread {}", thread, t); streams.close(); } streams.setUncaughtExceptionHandler(this::handleStreamException); streams.start(); {code} Unfortunately, because the KafkaStreams#close() method takes a lock, this is prone to what looks like a deadlock: {code} "StreamThread-1" #80 prio=5 os_prio=0 tid=0x7f56096f4000 nid=0x40c8 waiting for monitor entry [0x7f54f03ee000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java) - waiting to lock <0xf171cda8> (a org.apache.kafka.streams.KafkaStreams) at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:438) at com.opentable.chat.service.ChatStorage$$Lambda$161/1940967023.close(Unknown Source) at com.opentable.chat.service.ChatStorage.closeLog(ChatStorage.java:212) at com.opentable.chat.service.ChatStorage.close(ChatStorage.java:207) at com.opentable.chat.service.ChatStorage.handleStreamException(ChatStorage.java:541) at com.opentable.chat.service.ChatStorage$$Lambda$123/149062221.uncaughtException(Unknown Source) at java.lang.Thread.dispatchUncaughtException(Thread.java:1956) "main" #1 prio=5 os_prio=0 tid=0x7f5608011000 nid=0x3f76 in Object.wait() [0x7f5610f04000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.lang.Thread.join(Thread.java:1249) - locked <0xfd302bf0> (a java.lang.Thread) at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:494) - locked <0xf171cda8> (a org.apache.kafka.streams.KafkaStreams) at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:438) at com.opentable.chat.service.ChatStorage$$Lambda$161/1940967023.close(Unknown Source) at com.opentable.chat.service.ChatStorage.closeLog(ChatStorage.java:212) at com.opentable.chat.service.ChatStorage.close(ChatStorage.java:207) {code} Note how the main thread calls close(), which encounters an exception. It uses a StreamThread to dispatch to the handler, which calls close(). Once it tries to take the monitor, we are left in a position where main is joined on StreamThread-1, but StreamThread-1 is waiting for main to release that monitor. Arguably it's a bit abusive to call close() in this way (it certainly wasn't intentional) -- but to make Kafka Streams robust it should handle any sequence of close() invocations in particular gracefully. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (KAFKA-4726) ValueMapper should have (read) access to key
[ https://issues.apache.org/jira/browse/KAFKA-4726?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Schlansker updated KAFKA-4726: - Description: {{ValueMapper}} should have read-only access to the key for the value it is mapping. Sometimes the value transformation will depend on the key. It is possible to do this with a full blown {{KeyValueMapper}} but that loses the promise that you won't change the key -- so you might introduce a re-keying phase that is totally unnecessary. It also requires you to return an identity KeyValue object which costs something to construct (unless we are lucky and the optimizer elides it). [ If mapValues() is guaranteed to be no less efficient than map() the issue may be moot, but I presume there are some optimizations that are valid with the former but not latter. ] was: {{ValueMapper}} should have read-only access to the key for the value it is mapping. Sometimes the value transformation will depend on the key. It is possible to do this with a full blown {{KeyValueMapper}} but that loses the promise that you won't change the key -- so you might introduce a re-keying phase that is totally unnecessary. [ If mapValues() is guaranteed to be no less efficient than map() the issue may be moot, but I presume there are some optimizations that are valid with the former but not latter. ] > ValueMapper should have (read) access to key > > > Key: KAFKA-4726 > URL: https://issues.apache.org/jira/browse/KAFKA-4726 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.1.1 >Reporter: Steven Schlansker > > {{ValueMapper}} should have read-only access to the key for the value it is > mapping. Sometimes the value transformation will depend on the key. > It is possible to do this with a full blown {{KeyValueMapper}} but that loses > the promise that you won't change the key -- so you might introduce a > re-keying phase that is totally unnecessary. It also requires you to return > an identity KeyValue object which costs something to construct (unless we are > lucky and the optimizer elides it). > [ If mapValues() is guaranteed to be no less efficient than map() the issue > may be moot, but I presume there are some optimizations that are valid with > the former but not latter. ] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-4726) ValueMapper should have (read) access to key
Steven Schlansker created KAFKA-4726: Summary: ValueMapper should have (read) access to key Key: KAFKA-4726 URL: https://issues.apache.org/jira/browse/KAFKA-4726 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 0.10.1.1 Reporter: Steven Schlansker {{ValueMapper}} should have read-only access to the key for the value it is mapping. Sometimes the value transformation will depend on the key. It is possible to do this with a full blown {{KeyValueMapper}} but that loses the promise that you won't change the key -- so you might introduce a re-keying phase that is totally unnecessary. [ If mapValues() is guaranteed to be no less efficient than map() the issue may be moot, but I presume there are some optimizations that are valid with the former but not latter. ] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4722) StreamThread should allow customization of thread prefix
[ https://issues.apache.org/jira/browse/KAFKA-4722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15850243#comment-15850243 ] Steven Schlansker commented on KAFKA-4722: -- That would be perfect, thanks :) > StreamThread should allow customization of thread prefix > > > Key: KAFKA-4722 > URL: https://issues.apache.org/jira/browse/KAFKA-4722 > Project: Kafka > Issue Type: New Feature > Components: streams >Affects Versions: 0.10.1.1 >Reporter: Steven Schlansker > > StreamThread currently sets its name thusly: > {code} > super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement()); > {code} > When you have multiple KStream / KTables in an application, it would be nice > to customize the "StreamThread" prefix. The id is a good start but a > human-recognizable name would make logs much easier to read. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4722) StreamThread should allow customization of thread prefix
[ https://issues.apache.org/jira/browse/KAFKA-4722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15849098#comment-15849098 ] Steven Schlansker commented on KAFKA-4722: -- I have multiple different functional sections of my application. Each has their own KStreamBuilder / KafkaStreams / KStream / KTable instances. An error thrown from one part of the application is very hard to differentiate from the others. For example right now I have: {code} 2017-02-01T14:41:22.120Z ERROR [StreamThread-2] o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-2] Streams application error during processing: java.lang.IllegalArgumentException: Assigned partition public.chat.message-0 for non-subscribed topic. at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignFromSubscribed(SubscriptionState.java:187) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:215) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) {code} and it is not clear which instance of {{KafkaStreams}} produced this. > StreamThread should allow customization of thread prefix > > > Key: KAFKA-4722 > URL: https://issues.apache.org/jira/browse/KAFKA-4722 > Project: Kafka > Issue Type: New Feature > Components: streams >Affects Versions: 0.10.1.1 >Reporter: Steven Schlansker > > StreamThread currently sets its name thusly: > {code} > super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement()); > {code} > When you have multiple KStream / KTables in an application, it would be nice > to customize the "StreamThread" prefix. The id is a good start but a > human-recognizable name would make logs much easier to read. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-4722) StreamThread should allow customization of thread prefix
Steven Schlansker created KAFKA-4722: Summary: StreamThread should allow customization of thread prefix Key: KAFKA-4722 URL: https://issues.apache.org/jira/browse/KAFKA-4722 Project: Kafka Issue Type: New Feature Components: streams Affects Versions: 0.10.1.1 Reporter: Steven Schlansker StreamThread currently sets its name thusly: {code} super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement()); {code} When you have multiple KStream / KTables in an application, it would be nice to customize the "StreamThread" prefix. The id is a good start but a human-recognizable name would make logs much easier to read. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-4721) KafkaStreams (and possibly others) should inherit Closeable
Steven Schlansker created KAFKA-4721: Summary: KafkaStreams (and possibly others) should inherit Closeable Key: KAFKA-4721 URL: https://issues.apache.org/jira/browse/KAFKA-4721 Project: Kafka Issue Type: New Feature Components: streams Affects Versions: 0.10.1.1 Reporter: Steven Schlansker KafkaStreams should inherit AutoCloseable or Closeable so that you can use try-with-resources: {code} try (KafkaStreams reader = storage.createStreams(builder)) { reader.start(); stopCondition.join(); } {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-4720) Add KStream.peek(ForeachAction<K,V>)
Steven Schlansker created KAFKA-4720: Summary: Add KStream.peek(ForeachAction) Key: KAFKA-4720 URL: https://issues.apache.org/jira/browse/KAFKA-4720 Project: Kafka Issue Type: New Feature Components: streams Affects Versions: 0.10.1.1 Reporter: Steven Schlansker Java's Stream provides a handy peek method that observes elements in the stream without transforming or filtering them. While you can emulate this functionality with either a filter or map, peek provides potentially useful semantic information (doesn't modify the stream) and is much more concise. Example usage: using Dropwizard Metrics to provide event counters {code} KStream s = ...; s.map(this::mungeData) .peek((i, s) -> metrics.noteMungedEvent(i, s)) .filter(this::hadProcessingError) .print(); {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-3921) Periodic refresh of metadata causes spurious log messages
Steven Schlansker created KAFKA-3921: Summary: Periodic refresh of metadata causes spurious log messages Key: KAFKA-3921 URL: https://issues.apache.org/jira/browse/KAFKA-3921 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.9.0.1 Reporter: Steven Schlansker Kafka cluster metadata has a configurable expiry period. (I don't understand why this is -- cluster updates can happen at any time, and we have to pick those up quicker than every 10 minutes? But this ticket isn't about that.) When this interval expires, the ClientUtils class spins up a SyncProducer, which sends a special message to retrieve metadata. The producer is then closed immediately after processing this message. This causes the SyncProducer to log both a connection open and close at INFO level: {code} 2016-06-30T17:50:19.408Z INFO <> [ProducerSendThread-central-buzzsaw-1-myhost] kafka.client.ClientUtils$ - Fetching metadata from broker BrokerEndPoint(2,broker-3.mycorp.com,9092) with correlation id 17188 for 1 topic(s) Set(logstash) 2016-06-30T17:50:19.410Z INFO <> [ProducerSendThread-central-buzzsaw-1-myhost] kafka.producer.SyncProducer - Connected to broker-3.mycorp.com:9092 for producing 2016-06-30T17:50:19.411Z INFO <> [ProducerSendThread-central-buzzsaw-1-myhost] kafka.producer.SyncProducer - Disconnecting from broker-3.mycorp.com:9092 2016-06-30T17:50:19.411Z INFO <> [ProducerSendThread-central-buzzsaw-1-myhost] kafka.producer.SyncProducer - Disconnecting from broker-14.mycorp.com:9092 2016-06-30T17:50:19.411Z INFO <> [ProducerSendThread-central-buzzsaw-1-myhost] kafka.producer.SyncProducer - Disconnecting from broker-logkafka-13.mycorp.com:9092 2016-06-30T17:50:19.411Z INFO <> [ProducerSendThread-central-buzzsaw-1-myhost] kafka.producer.SyncProducer - Disconnecting from broker-12.mycorp.com:9092 2016-06-30T17:50:19.413Z INFO <> [ProducerSendThread-central-buzzsaw-1-myhost] kafka.producer.SyncProducer - Connected to broker-12.mycorp.com:9092 for producing {code} When you are reading the logs, this appears periodically. We've had more than one administrator then think that the cluster is unhealthy, and client connections are getting dropped -- it's disconnecting from the broker so frequently! What is wrong??? But in reality, it is just this harmless / expected metadata update. Can we tweak the log levels so that the periodic background refresh does not log unless something goes wrong? The log messages are misleading and easy to misinterpret. I had to read the code pretty thoroughly to convince myself that these messages are actually harmless. -- This message was sent by Atlassian JIRA (v6.3.4#6332)