[jira] [Resolved] (KAFKA-14942) CopyOnWriteMap implements ConcurrentMap but does not implement required default methods

2023-04-27 Thread Steven Schlansker (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Schlansker resolved KAFKA-14942.
---
Resolution: Invalid

> CopyOnWriteMap implements ConcurrentMap but does not implement required 
> default methods
> ---
>
> Key: KAFKA-14942
> URL: https://issues.apache.org/jira/browse/KAFKA-14942
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.4.0
>Reporter: Steven Schlansker
>Priority: Minor
>
> Hi Kafka team,
> I was reading through the kafka-clients CopyOnWriteMap while investigating a 
> problem in a different library, and I think it is declaring that it is a 
> ConcurrentMap but does not completely implement that interface.
> In particular, it inherits e.g. computeIfAbsent as a default method from Map, 
> which is noted to be a non-atomic implementation, and is not synchronized in 
> any way. I think this can lead to a reader experiencing a map whose contents 
> are not consistent with any serial execution of write ops.
>  
> Consider a thread T1 which calls computeIfAbsent("a", _ -> "1")
> T1 computeIfAbsent calls get("a") and observes null, and is then pre-empted
> T2 calls put("a", "2"), which copies the (empty) backing map and stores 
> \{"a": "2"}
> T1 computeIfAbsent then wakes up, still thinking the value is null, and calls 
> put("a", "1").
>  
> This leads to the map finishing with the contents \{"a":"1"}, while any 
> serial execution of these two operations should always finish with \{"a":"2"}.
>  
> I think CopyOnWriteMap should either re-implement all mutating default 
> methods at least as synchronized. If this is a special internal map and we 
> know those will never be called, perhaps they should throw 
> UnsupportedOperationException or at least document the class as not a 
> complete and proper implementation.
>  
> Thank you for your consideration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14942) CopyOnWriteMap implements ConcurrentMap but does not implement required default methods

2023-04-26 Thread Steven Schlansker (Jira)
Steven Schlansker created KAFKA-14942:
-

 Summary: CopyOnWriteMap implements ConcurrentMap but does not 
implement required default methods
 Key: KAFKA-14942
 URL: https://issues.apache.org/jira/browse/KAFKA-14942
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 3.4.0
Reporter: Steven Schlansker


Hi Kafka team,

I was reading through the kafka-clients CopyOnWriteMap while investigating a 
problem in a different library, and I think it is declaring that it is a 
ConcurrentMap but does not completely implement that interface.

In particular, it inherits e.g. computeIfAbsent as a default method from Map, 
which is noted to be a non-atomic implementation, and is not synchronized in 
any way. I think this can lead to a reader experiencing a map whose contents 
are not consistent with any serial execution of write ops.

 

Consider a thread T1 which calls computeIfAbsent("a", _ -> "1")

T1 computeIfAbsent calls get("a") and observes null, and is then pre-empted

T2 calls put("a", "2"), which copies the (empty) backing map and stores \{"a": 
"2"}

T1 computeIfAbsent then wakes up, still thinking the value is null, and calls 
put("a", "1").

 

This leads to the map finishing with the contents \{"a":"1"}, while any serial 
execution of these two operations should always finish with \{"a":"2"}.

 

I think CopyOnWriteMap should either re-implement all mutating default methods 
at least as synchronized. If this is a special internal map and we know those 
will never be called, perhaps they should throw UnsupportedOperationException 
or at least document the class as not a complete and proper implementation.

 

Thank you for your consideration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13593) ThrottledChannelReaper slows broker shutdown by multiple seconds

2022-01-13 Thread Steven Schlansker (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Schlansker resolved KAFKA-13593.
---
Resolution: Fixed

> ThrottledChannelReaper slows broker shutdown by multiple seconds
> 
>
> Key: KAFKA-13593
> URL: https://issues.apache.org/jira/browse/KAFKA-13593
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 3.0.0
>Reporter: Steven Schlansker
>Priority: Minor
>
> We run an embedded KRaft broker in integration tests, to test that our 
> Producer / Consumers are all hooked up and verify our overall pipeline.
> While trying to deliver CI speed improvements, we noticed that the majority 
> of time for a small test is actually spent in Kafka broker shutdown. From the 
> logs, it looks like the ClientQuotaManager has multiple 
> ThrottledChannelReaper threads and each of them takes up to a second to 
> shutdown.
> {code:java}
> 2022-01-12T15:26:31.932Z [main] INFO  kafka.log.LogManager - Shutdown 
> complete.
> 2022-01-12T15:26:31.934Z [main] INFO  
> k.s.ClientQuotaManager$ThrottledChannelReaper - 
> [ThrottledChannelReaper-Fetch]: Shutting down
> 2022-01-12T15:26:32.311Z [main] INFO  
> k.s.ClientQuotaManager$ThrottledChannelReaper - 
> [ThrottledChannelReaper-Fetch]: Shutdown completed
> 2022-01-12T15:26:32.311Z [ThrottledChannelReaper-Fetch] INFO  
> k.s.ClientQuotaManager$ThrottledChannelReaper - 
> [ThrottledChannelReaper-Fetch]: Stopped
> 2022-01-12T15:26:32.311Z [main] INFO  
> k.s.ClientQuotaManager$ThrottledChannelReaper - 
> [ThrottledChannelReaper-Produce]: Shutting down
> 2022-01-12T15:26:33.312Z [ThrottledChannelReaper-Produce] INFO  
> k.s.ClientQuotaManager$ThrottledChannelReaper - 
> [ThrottledChannelReaper-Produce]: Stopped
> 2022-01-12T15:26:33.312Z [main] INFO  
> k.s.ClientQuotaManager$ThrottledChannelReaper - 
> [ThrottledChannelReaper-Produce]: Shutdown completed
> 2022-01-12T15:26:33.312Z [main] INFO  
> k.s.ClientQuotaManager$ThrottledChannelReaper - 
> [ThrottledChannelReaper-Request]: Shutting down
> 2022-01-12T15:26:34.315Z [ThrottledChannelReaper-Request] INFO  
> k.s.ClientQuotaManager$ThrottledChannelReaper - 
> [ThrottledChannelReaper-Request]: Stopped
> 2022-01-12T15:26:34.315Z [main] INFO  
> k.s.ClientQuotaManager$ThrottledChannelReaper - 
> [ThrottledChannelReaper-Request]: Shutdown completed
> 2022-01-12T15:26:34.315Z [main] INFO  
> k.s.ClientQuotaManager$ThrottledChannelReaper - 
> [ThrottledChannelReaper-ControllerMutation]: Shutting down
> 2022-01-12T15:26:35.317Z [main] INFO  
> k.s.ClientQuotaManager$ThrottledChannelReaper - 
> [ThrottledChannelReaper-ControllerMutation]: Shutdown completed
> 2022-01-12T15:26:35.317Z [ThrottledChannelReaper-ControllerMutation] INFO  
> k.s.ClientQuotaManager$ThrottledChannelReaper - 
> [ThrottledChannelReaper-ControllerMutation]: Stopped{code}
> Inspecting the code, the ThrottledChannelReaper threads are marked as not 
> interruptible, so ShutdownableThread does not interrupt the worker on 
> shutdown. Unfortunately, the doWork method polls with a 1 second timeout. So 
> you wait up to 1s for every type of quota, in this case for a total of almost 
> 4s.
>  
> While this is not a problem for production Kafka brokers, where instances are 
> expected to stay up for months, in tests that expect to spin up and down it 
> is easily noticed and adds real overhead to CI.
>  
> Suggested possible remediations:
>  * Allow ThrottledChannelReaper to be interrupted by ShutdownableThread
>  * ThrottledChannelReaper overrides initiateShutdown and after calling 
> {{super.initiateShutdown}} then enqueues a {{null}} element on the delayQueue 
> to force worker thread to notice shutdown state
>  * Reduce 1s poll timeout to something small (carries overhead penalty for 
> all users though, so this is less desirable), or make it configurable so we 
> can set it to e.g. 10ms in unit tests



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13593) ThrottledChannelReaper slows broker shutdown by multiple seconds

2022-01-12 Thread Steven Schlansker (Jira)
Steven Schlansker created KAFKA-13593:
-

 Summary: ThrottledChannelReaper slows broker shutdown by multiple 
seconds
 Key: KAFKA-13593
 URL: https://issues.apache.org/jira/browse/KAFKA-13593
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 3.0.0
Reporter: Steven Schlansker


We run an embedded KRaft broker in integration tests, to test that our Producer 
/ Consumers are all hooked up and verify our overall pipeline.

While trying to deliver CI speed improvements, we noticed that the majority of 
time for a small test is actually spent in Kafka broker shutdown. From the 
logs, it looks like the ClientQuotaManager has multiple ThrottledChannelReaper 
threads and each of them takes up to a second to shutdown.
{code:java}
2022-01-12T15:26:31.932Z [main] INFO  kafka.log.LogManager - Shutdown complete.
2022-01-12T15:26:31.934Z [main] INFO  
k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-Fetch]: 
Shutting down
2022-01-12T15:26:32.311Z [main] INFO  
k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-Fetch]: 
Shutdown completed
2022-01-12T15:26:32.311Z [ThrottledChannelReaper-Fetch] INFO  
k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-Fetch]: 
Stopped
2022-01-12T15:26:32.311Z [main] INFO  
k.s.ClientQuotaManager$ThrottledChannelReaper - 
[ThrottledChannelReaper-Produce]: Shutting down
2022-01-12T15:26:33.312Z [ThrottledChannelReaper-Produce] INFO  
k.s.ClientQuotaManager$ThrottledChannelReaper - 
[ThrottledChannelReaper-Produce]: Stopped
2022-01-12T15:26:33.312Z [main] INFO  
k.s.ClientQuotaManager$ThrottledChannelReaper - 
[ThrottledChannelReaper-Produce]: Shutdown completed
2022-01-12T15:26:33.312Z [main] INFO  
k.s.ClientQuotaManager$ThrottledChannelReaper - 
[ThrottledChannelReaper-Request]: Shutting down
2022-01-12T15:26:34.315Z [ThrottledChannelReaper-Request] INFO  
k.s.ClientQuotaManager$ThrottledChannelReaper - 
[ThrottledChannelReaper-Request]: Stopped
2022-01-12T15:26:34.315Z [main] INFO  
k.s.ClientQuotaManager$ThrottledChannelReaper - 
[ThrottledChannelReaper-Request]: Shutdown completed
2022-01-12T15:26:34.315Z [main] INFO  
k.s.ClientQuotaManager$ThrottledChannelReaper - 
[ThrottledChannelReaper-ControllerMutation]: Shutting down
2022-01-12T15:26:35.317Z [main] INFO  
k.s.ClientQuotaManager$ThrottledChannelReaper - 
[ThrottledChannelReaper-ControllerMutation]: Shutdown completed
2022-01-12T15:26:35.317Z [ThrottledChannelReaper-ControllerMutation] INFO  
k.s.ClientQuotaManager$ThrottledChannelReaper - 
[ThrottledChannelReaper-ControllerMutation]: Stopped{code}
Inspecting the code, the ThrottledChannelReaper threads are marked as not 
interruptible, so ShutdownableThread does not interrupt the worker on shutdown. 
Unfortunately, the doWork method polls with a 1 second timeout. So you wait up 
to 1s for every type of quota, in this case for a total of almost 4s.

 

While this is not a problem for production Kafka brokers, where instances are 
expected to stay up for months, in tests that expect to spin up and down it is 
easily noticed and adds real overhead to CI.

 

Suggested possible remediations:
 * Allow ThrottledChannelReaper to be interrupted by ShutdownableThread
 * ThrottledChannelReaper overrides initiateShutdown and after calling 
{{super.initiateShutdown}} then enqueues a {{null}} element on the delayQueue 
to force worker thread to notice shutdown state
 * Reduce 1s poll timeout to something small (carries overhead penalty for all 
users though, so this is less desirable), or make it configurable so we can set 
it to e.g. 10ms in unit tests



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-6767) OffsetCheckpoint write assumes parent directory exists

2018-04-09 Thread Steven Schlansker (JIRA)
Steven Schlansker created KAFKA-6767:


 Summary: OffsetCheckpoint write assumes parent directory exists
 Key: KAFKA-6767
 URL: https://issues.apache.org/jira/browse/KAFKA-6767
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.1.0
Reporter: Steven Schlansker


We run Kafka Streams with RocksDB state stores on ephemeral disks (i.e. if an 
instance dies it is created from scratch, rather than reusing the existing 
RocksDB.)

We routinely see:
{code:java}
2018-04-09T19:14:35.004Z WARN <> 
[chat-0319e3c3-d8b2-4c60-bd69-a8484d8d4435-StreamThread-1] 
o.a.k.s.p.i.ProcessorStateManager - task [0_11] Failed to write offset 
checkpoint file to /mnt/mesos/sandbox/storage/chat/0_11/.checkpoint: {}
java.io.FileNotFoundException: 
/mnt/mesos/sandbox/storage/chat/0_11/.checkpoint.tmp (No such file or directory)
at java.io.FileOutputStream.open0(Native Method)
at java.io.FileOutputStream.open(FileOutputStream.java:270)
at java.io.FileOutputStream.(FileOutputStream.java:213)
at java.io.FileOutputStream.(FileOutputStream.java:162)
at 
org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:78)
at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:320)
at 
org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:314)
at 
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:307)
at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:297)
at 
org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67)
at 
org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:357)
at 
org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:347)
at 
org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:403)
at 
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:994)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:811)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720){code}
Inspecting the state store directory, I can indeed see that {{chat/0_11}} does 
not exist (although many other partitions do).

 

Looking at the OffsetCheckpoint write method, it seems to try to open a new 
checkpoint file without first ensuring that the parent directory exists.

 
{code:java}
    public void write(final Map offsets) throws 
IOException {
    // if there is no offsets, skip writing the file to save disk IOs
    if (offsets.isEmpty()) {
    return;
    }

    synchronized (lock) {
    // write to temp file and then swap with the existing file
    final File temp = new File(file.getAbsolutePath() + ".tmp");{code}
 

Either the OffsetCheckpoint class should initialize the directories if needed, 
or some precondition of it being called should ensure that is the case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4829) Improve logging of StreamTask commits

2017-06-15 Thread Steven Schlansker (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16050793#comment-16050793
 ] 

Steven Schlansker commented on KAFKA-4829:
--

This is talking about application logs; the link seems to refer to database 
logs, not at all the same thing.  Or am I missing something?

> Improve logging of StreamTask commits
> -
>
> Key: KAFKA-4829
> URL: https://issues.apache.org/jira/browse/KAFKA-4829
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Steven Schlansker
>Priority: Minor
>  Labels: user-experience
>
> Currently I see this every commit interval:
> {code}
> 2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] 
> o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Committing 
> task StreamTask 1_31
> 2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] 
> o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Committing 
> task StreamTask 2_31
> {code}
> We have ~10 tasks in our topology, 4 topics, and 32 partitions per topic.
> This means every commit interval we log a few hundred lines of the above
> which is an order of magnitude chattier than anything else in the log
> during normal operations.
> To improve visibility of important messages, we should reduce the chattiness 
> of normal commits and highlight abnormal commits.  An example proposal:
> existing message is fine at TRACE level for diagnostics
> {{TRACE o.a.k.s.p.i.StreamThread - Committing task StreamTask 1_31}}
> normal fast case, wrap them all up into one summary line
> {{INFO o.a.k.s.p.i.StreamThreads - 64 stream tasks committed in 25ms}}
> some kind of threshold / messaging in case it doesn't complete quickly or 
> logs an exception
> {{ERROR o.a.k.s.p.i.StreamThread - StreamTask 1_32 did not commit in 100ms}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-4829) Improve logging of StreamTask commits

2017-06-15 Thread Steven Schlansker (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16050793#comment-16050793
 ] 

Steven Schlansker edited comment on KAFKA-4829 at 6/15/17 5:10 PM:
---

This ticket is talking about application logs; the link seems to refer to 
database logs, not at all the same thing.  Or am I missing something?


was (Author: stevenschlansker):
This is talking about application logs; the link seems to refer to database 
logs, not at all the same thing.  Or am I missing something?

> Improve logging of StreamTask commits
> -
>
> Key: KAFKA-4829
> URL: https://issues.apache.org/jira/browse/KAFKA-4829
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Steven Schlansker
>Priority: Minor
>  Labels: user-experience
>
> Currently I see this every commit interval:
> {code}
> 2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] 
> o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Committing 
> task StreamTask 1_31
> 2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] 
> o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Committing 
> task StreamTask 2_31
> {code}
> We have ~10 tasks in our topology, 4 topics, and 32 partitions per topic.
> This means every commit interval we log a few hundred lines of the above
> which is an order of magnitude chattier than anything else in the log
> during normal operations.
> To improve visibility of important messages, we should reduce the chattiness 
> of normal commits and highlight abnormal commits.  An example proposal:
> existing message is fine at TRACE level for diagnostics
> {{TRACE o.a.k.s.p.i.StreamThread - Committing task StreamTask 1_31}}
> normal fast case, wrap them all up into one summary line
> {{INFO o.a.k.s.p.i.StreamThreads - 64 stream tasks committed in 25ms}}
> some kind of threshold / messaging in case it doesn't complete quickly or 
> logs an exception
> {{ERROR o.a.k.s.p.i.StreamThread - StreamTask 1_32 did not commit in 100ms}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-1313) Support adding replicas to existing topic partitions via kafka-topics tool without manually setting broker assignments

2017-05-02 Thread Steven Schlansker (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15993414#comment-15993414
 ] 

Steven Schlansker commented on KAFKA-1313:
--

I started using Kafka Streams recently.  I did not know to configure the 
{{replication.factor}} tuneable and so now all of my automatically generated 
topics have the wrong replication factor.  I tried to update via 
{{kafka-topics.sh}} and obviously ended up here.  I understand why this got 
deprioritized, but consider now that in addition to an administrator creating 
topics (where they have an opportunity to set replication factor right), Kafka 
Streams creates topics behind your back and you may not realize your 
replication factor is wrong until you have a lot of existing data.

I can obviously fix it up by hand as outlined above but this is a pretty big 
wart and seems that it should be well worth fixing.

> Support adding replicas to existing topic partitions via kafka-topics tool 
> without manually setting broker assignments
> --
>
> Key: KAFKA-1313
> URL: https://issues.apache.org/jira/browse/KAFKA-1313
> Project: Kafka
>  Issue Type: New Feature
>  Components: tools
>Affects Versions: 0.8.0
>Reporter: Marc Labbe
>Assignee: Sreepathi Prasanna
>  Labels: newbie++
>
> There is currently no easy way to add replicas to an existing topic 
> partitions.
> For example, topic create-test has been created with ReplicationFactor=1: 
> Topic:create-test  PartitionCount:3ReplicationFactor:1 Configs:
> Topic: create-test Partition: 0Leader: 1   Replicas: 1 Isr: 1
> Topic: create-test Partition: 1Leader: 2   Replicas: 2 Isr: 2
> Topic: create-test Partition: 2Leader: 3   Replicas: 3 Isr: 3
> I would like to increase the ReplicationFactor=2 (or more) so it shows up 
> like this instead.
> Topic:create-test  PartitionCount:3ReplicationFactor:2 Configs:
> Topic: create-test Partition: 0Leader: 1   Replicas: 1,2 Isr: 1,2
> Topic: create-test Partition: 1Leader: 2   Replicas: 2,3 Isr: 2,3
> Topic: create-test Partition: 2Leader: 3   Replicas: 3,1 Isr: 3,1
> Use cases for this:
> - adding brokers and thus increase fault tolerance
> - fixing human errors for topics created with wrong values



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4829) Improve logging of StreamTask commits

2017-03-01 Thread Steven Schlansker (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Schlansker updated KAFKA-4829:
-
Description: 
Currently I see this every commit interval:

{code}
2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] 
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Committing 
task StreamTask 1_31
2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] 
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Committing 
task StreamTask 2_31
{code}

We have ~10 tasks in our topology, 4 topics, and 32 partitions per topic.
This means every commit interval we log a few hundred lines of the above
which is an order of magnitude chattier than anything else in the log
during normal operations.

To improve visibility of important messages, we should reduce the chattiness of 
normal commits and highlight abnormal commits.  An example proposal:

existing message is fine at TRACE level for diagnostics
{{TRACE o.a.k.s.p.i.StreamThread - Committing task StreamTask 1_31}}

normal fast case, wrap them all up into one summary line
{{INFO o.a.k.s.p.i.StreamThreads - 64 stream tasks committed in 25ms}}

some kind of threshold / messaging in case it doesn't complete quickly or logs 
an exception
{{ERROR o.a.k.s.p.i.StreamThread - StreamTask 1_32 did not commit in 100ms}}

  was:
Currently I see this every commit interval:

{code}
2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] 
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Committing 
task StreamTask 1_31
2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] 
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Committing 
task StreamTask 2_31
{code}

We have ~10 tasks in our topology, 4 topics, and 32 partitions per topic.
This means every commit interval we log a few hundred lines of the above
which is an order of magnitude chattier than anything else in the log
during normal operations.

To improve visibility of important messages, we should reduce the chattiness of 
normal commits and highlight abnormal commits.  An example proposal:

existing message is fine at TRACE level for diagnostics
{{TRACE o.a.k.s.p.i.StreamThread - Committing task StreamTask 1_31}}

normal fast case, wrap them all up into one summary line
{{INFO o.a.k.s.p.i.StreamThreads - 64 stream tasks committed in 25ms}}

some kind of threshold / messaging in case it doesn't complete quickly
or logs an exception
{{ERROR o.a.k.s.p.i.StreamThread - StreamTask 1_32 did not commit in 100ms}}

Thoughts?


> Improve logging of StreamTask commits
> -
>
> Key: KAFKA-4829
> URL: https://issues.apache.org/jira/browse/KAFKA-4829
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Steven Schlansker
>Priority: Minor
>
> Currently I see this every commit interval:
> {code}
> 2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] 
> o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Committing 
> task StreamTask 1_31
> 2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] 
> o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Committing 
> task StreamTask 2_31
> {code}
> We have ~10 tasks in our topology, 4 topics, and 32 partitions per topic.
> This means every commit interval we log a few hundred lines of the above
> which is an order of magnitude chattier than anything else in the log
> during normal operations.
> To improve visibility of important messages, we should reduce the chattiness 
> of normal commits and highlight abnormal commits.  An example proposal:
> existing message is fine at TRACE level for diagnostics
> {{TRACE o.a.k.s.p.i.StreamThread - Committing task StreamTask 1_31}}
> normal fast case, wrap them all up into one summary line
> {{INFO o.a.k.s.p.i.StreamThreads - 64 stream tasks committed in 25ms}}
> some kind of threshold / messaging in case it doesn't complete quickly or 
> logs an exception
> {{ERROR o.a.k.s.p.i.StreamThread - StreamTask 1_32 did not commit in 100ms}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4829) Improve logging of StreamTask commits

2017-03-01 Thread Steven Schlansker (JIRA)
Steven Schlansker created KAFKA-4829:


 Summary: Improve logging of StreamTask commits
 Key: KAFKA-4829
 URL: https://issues.apache.org/jira/browse/KAFKA-4829
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 0.10.2.0
Reporter: Steven Schlansker
Priority: Minor


Currently I see this every commit interval:

2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] 
o.a.k.s.p.internals.StreamThread
- stream-thread [StreamThread-1] Committing task StreamTask 1_31
2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] 
o.a.k.s.p.internals.StreamThread
- stream-thread [StreamThread-1] Committing task StreamTask 2_31

We have ~10 tasks in our topology, 4 topics, and 32 partitions per topic.
This means every commit interval we log a few hundred lines of the above
which is an order of magnitude chattier than anything else in the log
during normal operations.

To improve visibility of important messages, we should reduce the chattiness of 
normal commits and highlight abnormal commits.  An example proposal:

existing message is fine at TRACE level for diagnostics
{{TRACE o.a.k.s.p.i.StreamThread - Committing task StreamTask 1_31}}

normal fast case, wrap them all up into one summary line
{{INFO o.a.k.s.p.i.StreamThreads - 64 stream tasks committed in 25ms}}

some kind of threshold / messaging in case it doesn't complete quickly
or logs an exception
{{ERROR o.a.k.s.p.i.StreamThread - StreamTask 1_32 did not commit in 100ms}}

Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4829) Improve logging of StreamTask commits

2017-03-01 Thread Steven Schlansker (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Schlansker updated KAFKA-4829:
-
Description: 
Currently I see this every commit interval:

{code}
2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] 
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Committing 
task StreamTask 1_31
2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] 
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Committing 
task StreamTask 2_31
{code}

We have ~10 tasks in our topology, 4 topics, and 32 partitions per topic.
This means every commit interval we log a few hundred lines of the above
which is an order of magnitude chattier than anything else in the log
during normal operations.

To improve visibility of important messages, we should reduce the chattiness of 
normal commits and highlight abnormal commits.  An example proposal:

existing message is fine at TRACE level for diagnostics
{{TRACE o.a.k.s.p.i.StreamThread - Committing task StreamTask 1_31}}

normal fast case, wrap them all up into one summary line
{{INFO o.a.k.s.p.i.StreamThreads - 64 stream tasks committed in 25ms}}

some kind of threshold / messaging in case it doesn't complete quickly
or logs an exception
{{ERROR o.a.k.s.p.i.StreamThread - StreamTask 1_32 did not commit in 100ms}}

Thoughts?

  was:
Currently I see this every commit interval:

2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] 
o.a.k.s.p.internals.StreamThread
- stream-thread [StreamThread-1] Committing task StreamTask 1_31
2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] 
o.a.k.s.p.internals.StreamThread
- stream-thread [StreamThread-1] Committing task StreamTask 2_31

We have ~10 tasks in our topology, 4 topics, and 32 partitions per topic.
This means every commit interval we log a few hundred lines of the above
which is an order of magnitude chattier than anything else in the log
during normal operations.

To improve visibility of important messages, we should reduce the chattiness of 
normal commits and highlight abnormal commits.  An example proposal:

existing message is fine at TRACE level for diagnostics
{{TRACE o.a.k.s.p.i.StreamThread - Committing task StreamTask 1_31}}

normal fast case, wrap them all up into one summary line
{{INFO o.a.k.s.p.i.StreamThreads - 64 stream tasks committed in 25ms}}

some kind of threshold / messaging in case it doesn't complete quickly
or logs an exception
{{ERROR o.a.k.s.p.i.StreamThread - StreamTask 1_32 did not commit in 100ms}}

Thoughts?


> Improve logging of StreamTask commits
> -
>
> Key: KAFKA-4829
> URL: https://issues.apache.org/jira/browse/KAFKA-4829
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Steven Schlansker
>Priority: Minor
>
> Currently I see this every commit interval:
> {code}
> 2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] 
> o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Committing 
> task StreamTask 1_31
> 2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] 
> o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Committing 
> task StreamTask 2_31
> {code}
> We have ~10 tasks in our topology, 4 topics, and 32 partitions per topic.
> This means every commit interval we log a few hundred lines of the above
> which is an order of magnitude chattier than anything else in the log
> during normal operations.
> To improve visibility of important messages, we should reduce the chattiness 
> of normal commits and highlight abnormal commits.  An example proposal:
> existing message is fine at TRACE level for diagnostics
> {{TRACE o.a.k.s.p.i.StreamThread - Committing task StreamTask 1_31}}
> normal fast case, wrap them all up into one summary line
> {{INFO o.a.k.s.p.i.StreamThreads - 64 stream tasks committed in 25ms}}
> some kind of threshold / messaging in case it doesn't complete quickly
> or logs an exception
> {{ERROR o.a.k.s.p.i.StreamThread - StreamTask 1_32 did not commit in 100ms}}
> Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4787) KafkaStreams close() is not reentrant

2017-02-22 Thread Steven Schlansker (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4787?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15879336#comment-15879336
 ] 

Steven Schlansker commented on KAFKA-4787:
--

I'm not sure I understand why that should behave differently.  Replacing the 
lambda expression ({{this::handleStreamException}}) with your anonymous 
subclass implementation should be equivalent with regards to taking the 
{{synchronized}} monitor of the KafkaStreams object inside the {{public 
synchronized void close(long, TimeUnit)}} method.

I will test it anyway though and see if it in fact does behave differently.

> KafkaStreams close() is not reentrant
> -
>
> Key: KAFKA-4787
> URL: https://issues.apache.org/jira/browse/KAFKA-4787
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Steven Schlansker
>
> While building a simple application, I tried to implement a failure policy 
> where any uncaught exception terminates the application until an 
> administrator can evaluate and intervene:
> {code}
> /** Handle any uncaught exception by shutting down the program. */
> private void handleStreamException(Thread thread, Throwable t) {
> LOG.error("stream exception in thread {}", thread, t);
> streams.close();
> }
> streams.setUncaughtExceptionHandler(this::handleStreamException);
> streams.start();
> {code}
> Unfortunately, because the KafkaStreams#close() method takes a lock, this is 
> prone to what looks like a deadlock:
> {code}
> "StreamThread-1" #80 prio=5 os_prio=0 tid=0x7f56096f4000 nid=0x40c8 
> waiting for monitor entry [0x7f54f03ee000]
>java.lang.Thread.State: BLOCKED (on object monitor)
> at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java)
> - waiting to lock <0xf171cda8> (a 
> org.apache.kafka.streams.KafkaStreams)
> at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:438)
> at 
> com.opentable.chat.service.ChatStorage$$Lambda$161/1940967023.close(Unknown 
> Source)
> at 
> com.opentable.chat.service.ChatStorage.closeLog(ChatStorage.java:212)
> at com.opentable.chat.service.ChatStorage.close(ChatStorage.java:207)
> at 
> com.opentable.chat.service.ChatStorage.handleStreamException(ChatStorage.java:541)
> at 
> com.opentable.chat.service.ChatStorage$$Lambda$123/149062221.uncaughtException(Unknown
>  Source)
> at java.lang.Thread.dispatchUncaughtException(Thread.java:1956)
> "main" #1 prio=5 os_prio=0 tid=0x7f5608011000 nid=0x3f76 in Object.wait() 
> [0x7f5610f04000]
>java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.Thread.join(Thread.java:1249)
> - locked <0xfd302bf0> (a java.lang.Thread)
> at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:494)
> - locked <0xf171cda8> (a 
> org.apache.kafka.streams.KafkaStreams)
> at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:438)
> at 
> com.opentable.chat.service.ChatStorage$$Lambda$161/1940967023.close(Unknown 
> Source)
> at 
> com.opentable.chat.service.ChatStorage.closeLog(ChatStorage.java:212)
> at com.opentable.chat.service.ChatStorage.close(ChatStorage.java:207)
> {code}
> Note how the main thread calls close(), which encounters an exception.  It 
> uses a StreamThread to dispatch to the handler, which calls close().  Once it 
> tries to take the monitor, we are left in a position where main is joined on 
> StreamThread-1, but StreamThread-1 is waiting for main to release that 
> monitor.
> Arguably it's a bit abusive to call close() in this way (it certainly wasn't 
> intentional) -- but to make Kafka Streams robust it should handle any 
> sequence of close() invocations in particular gracefully.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4787) KafkaStreams close() is not reentrant

2017-02-22 Thread Steven Schlansker (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4787?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15879019#comment-15879019
 ] 

Steven Schlansker commented on KAFKA-4787:
--

A simple solution could simply have further close() calls not call the inner 
synchronized close overload but instead join on the already existing close 
request

> KafkaStreams close() is not reentrant
> -
>
> Key: KAFKA-4787
> URL: https://issues.apache.org/jira/browse/KAFKA-4787
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Steven Schlansker
>
> While building a simple application, I tried to implement a failure policy 
> where any uncaught exception terminates the application until an 
> administrator can evaluate and intervene:
> {code}
> /** Handle any uncaught exception by shutting down the program. */
> private void handleStreamException(Thread thread, Throwable t) {
> LOG.error("stream exception in thread {}", thread, t);
> streams.close();
> }
> streams.setUncaughtExceptionHandler(this::handleStreamException);
> streams.start();
> {code}
> Unfortunately, because the KafkaStreams#close() method takes a lock, this is 
> prone to what looks like a deadlock:
> {code}
> "StreamThread-1" #80 prio=5 os_prio=0 tid=0x7f56096f4000 nid=0x40c8 
> waiting for monitor entry [0x7f54f03ee000]
>java.lang.Thread.State: BLOCKED (on object monitor)
> at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java)
> - waiting to lock <0xf171cda8> (a 
> org.apache.kafka.streams.KafkaStreams)
> at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:438)
> at 
> com.opentable.chat.service.ChatStorage$$Lambda$161/1940967023.close(Unknown 
> Source)
> at 
> com.opentable.chat.service.ChatStorage.closeLog(ChatStorage.java:212)
> at com.opentable.chat.service.ChatStorage.close(ChatStorage.java:207)
> at 
> com.opentable.chat.service.ChatStorage.handleStreamException(ChatStorage.java:541)
> at 
> com.opentable.chat.service.ChatStorage$$Lambda$123/149062221.uncaughtException(Unknown
>  Source)
> at java.lang.Thread.dispatchUncaughtException(Thread.java:1956)
> "main" #1 prio=5 os_prio=0 tid=0x7f5608011000 nid=0x3f76 in Object.wait() 
> [0x7f5610f04000]
>java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at java.lang.Thread.join(Thread.java:1249)
> - locked <0xfd302bf0> (a java.lang.Thread)
> at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:494)
> - locked <0xf171cda8> (a 
> org.apache.kafka.streams.KafkaStreams)
> at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:438)
> at 
> com.opentable.chat.service.ChatStorage$$Lambda$161/1940967023.close(Unknown 
> Source)
> at 
> com.opentable.chat.service.ChatStorage.closeLog(ChatStorage.java:212)
> at com.opentable.chat.service.ChatStorage.close(ChatStorage.java:207)
> {code}
> Note how the main thread calls close(), which encounters an exception.  It 
> uses a StreamThread to dispatch to the handler, which calls close().  Once it 
> tries to take the monitor, we are left in a position where main is joined on 
> StreamThread-1, but StreamThread-1 is waiting for main to release that 
> monitor.
> Arguably it's a bit abusive to call close() in this way (it certainly wasn't 
> intentional) -- but to make Kafka Streams robust it should handle any 
> sequence of close() invocations in particular gracefully.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4787) KafkaStreams close() is not reentrant

2017-02-22 Thread Steven Schlansker (JIRA)
Steven Schlansker created KAFKA-4787:


 Summary: KafkaStreams close() is not reentrant
 Key: KAFKA-4787
 URL: https://issues.apache.org/jira/browse/KAFKA-4787
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 0.10.2.0
Reporter: Steven Schlansker


While building a simple application, I tried to implement a failure policy 
where any uncaught exception terminates the application until an administrator 
can evaluate and intervene:

{code}
/** Handle any uncaught exception by shutting down the program. */
private void handleStreamException(Thread thread, Throwable t) {
LOG.error("stream exception in thread {}", thread, t);
streams.close();
}

streams.setUncaughtExceptionHandler(this::handleStreamException);
streams.start();
{code}

Unfortunately, because the KafkaStreams#close() method takes a lock, this is 
prone to what looks like a deadlock:

{code}
"StreamThread-1" #80 prio=5 os_prio=0 tid=0x7f56096f4000 nid=0x40c8 waiting 
for monitor entry [0x7f54f03ee000]
   java.lang.Thread.State: BLOCKED (on object monitor)
at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java)
- waiting to lock <0xf171cda8> (a 
org.apache.kafka.streams.KafkaStreams)
at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:438)
at 
com.opentable.chat.service.ChatStorage$$Lambda$161/1940967023.close(Unknown 
Source)
at com.opentable.chat.service.ChatStorage.closeLog(ChatStorage.java:212)
at com.opentable.chat.service.ChatStorage.close(ChatStorage.java:207)
at 
com.opentable.chat.service.ChatStorage.handleStreamException(ChatStorage.java:541)
at 
com.opentable.chat.service.ChatStorage$$Lambda$123/149062221.uncaughtException(Unknown
 Source)
at java.lang.Thread.dispatchUncaughtException(Thread.java:1956)

"main" #1 prio=5 os_prio=0 tid=0x7f5608011000 nid=0x3f76 in Object.wait() 
[0x7f5610f04000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1249)
- locked <0xfd302bf0> (a java.lang.Thread)
at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:494)
- locked <0xf171cda8> (a org.apache.kafka.streams.KafkaStreams)
at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:438)
at 
com.opentable.chat.service.ChatStorage$$Lambda$161/1940967023.close(Unknown 
Source)
at com.opentable.chat.service.ChatStorage.closeLog(ChatStorage.java:212)
at com.opentable.chat.service.ChatStorage.close(ChatStorage.java:207)
{code}

Note how the main thread calls close(), which encounters an exception.  It uses 
a StreamThread to dispatch to the handler, which calls close().  Once it tries 
to take the monitor, we are left in a position where main is joined on 
StreamThread-1, but StreamThread-1 is waiting for main to release that monitor.

Arguably it's a bit abusive to call close() in this way (it certainly wasn't 
intentional) -- but to make Kafka Streams robust it should handle any sequence 
of close() invocations in particular gracefully.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4726) ValueMapper should have (read) access to key

2017-02-02 Thread Steven Schlansker (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4726?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Schlansker updated KAFKA-4726:
-
Description: 
{{ValueMapper}} should have read-only access to the key for the value it is 
mapping.  Sometimes the value transformation will depend on the key.

It is possible to do this with a full blown {{KeyValueMapper}} but that loses 
the promise that you won't change the key -- so you might introduce a re-keying 
phase that is totally unnecessary.  It also requires you to return an identity 
KeyValue object which costs something to construct (unless we are lucky and the 
optimizer elides it).

[ If mapValues() is guaranteed to be no less efficient than map() the issue may 
be moot, but I presume there are some optimizations that are valid with the 
former but not latter. ]

  was:
{{ValueMapper}} should have read-only access to the key for the value it is 
mapping.  Sometimes the value transformation will depend on the key.

It is possible to do this with a full blown {{KeyValueMapper}} but that loses 
the promise that you won't change the key -- so you might introduce a re-keying 
phase that is totally unnecessary.

[ If mapValues() is guaranteed to be no less efficient than map() the issue may 
be moot, but I presume there are some optimizations that are valid with the 
former but not latter. ]


> ValueMapper should have (read) access to key
> 
>
> Key: KAFKA-4726
> URL: https://issues.apache.org/jira/browse/KAFKA-4726
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.1.1
>Reporter: Steven Schlansker
>
> {{ValueMapper}} should have read-only access to the key for the value it is 
> mapping.  Sometimes the value transformation will depend on the key.
> It is possible to do this with a full blown {{KeyValueMapper}} but that loses 
> the promise that you won't change the key -- so you might introduce a 
> re-keying phase that is totally unnecessary.  It also requires you to return 
> an identity KeyValue object which costs something to construct (unless we are 
> lucky and the optimizer elides it).
> [ If mapValues() is guaranteed to be no less efficient than map() the issue 
> may be moot, but I presume there are some optimizations that are valid with 
> the former but not latter. ]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4726) ValueMapper should have (read) access to key

2017-02-02 Thread Steven Schlansker (JIRA)
Steven Schlansker created KAFKA-4726:


 Summary: ValueMapper should have (read) access to key
 Key: KAFKA-4726
 URL: https://issues.apache.org/jira/browse/KAFKA-4726
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 0.10.1.1
Reporter: Steven Schlansker


{{ValueMapper}} should have read-only access to the key for the value it is 
mapping.  Sometimes the value transformation will depend on the key.

It is possible to do this with a full blown {{KeyValueMapper}} but that loses 
the promise that you won't change the key -- so you might introduce a re-keying 
phase that is totally unnecessary.

[ If mapValues() is guaranteed to be no less efficient than map() the issue may 
be moot, but I presume there are some optimizations that are valid with the 
former but not latter. ]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4722) StreamThread should allow customization of thread prefix

2017-02-02 Thread Steven Schlansker (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15850243#comment-15850243
 ] 

Steven Schlansker commented on KAFKA-4722:
--

That would be perfect, thanks :)

> StreamThread should allow customization of thread prefix
> 
>
> Key: KAFKA-4722
> URL: https://issues.apache.org/jira/browse/KAFKA-4722
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.10.1.1
>Reporter: Steven Schlansker
>
> StreamThread currently sets its name thusly:
> {code}
> super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
> {code}
> When you have multiple KStream / KTables in an application, it would be nice 
> to customize the "StreamThread" prefix.  The id is a good start but a 
> human-recognizable name would make logs much easier to read.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4722) StreamThread should allow customization of thread prefix

2017-02-01 Thread Steven Schlansker (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15849098#comment-15849098
 ] 

Steven Schlansker commented on KAFKA-4722:
--

I have multiple different functional sections of my application.  Each has 
their own KStreamBuilder / KafkaStreams / KStream / KTable instances.  An error 
thrown from one part of the application is very hard to differentiate from the 
others.  For example right now I have:

{code}
2017-02-01T14:41:22.120Z ERROR [StreamThread-2] 
o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-2] Streams 
application error during processing:  
java.lang.IllegalArgumentException: Assigned partition public.chat.message-0 
for non-subscribed topic.
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignFromSubscribed(SubscriptionState.java:187)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:215)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
{code}

and it is not clear which instance of {{KafkaStreams}} produced this.


> StreamThread should allow customization of thread prefix
> 
>
> Key: KAFKA-4722
> URL: https://issues.apache.org/jira/browse/KAFKA-4722
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.10.1.1
>Reporter: Steven Schlansker
>
> StreamThread currently sets its name thusly:
> {code}
> super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
> {code}
> When you have multiple KStream / KTables in an application, it would be nice 
> to customize the "StreamThread" prefix.  The id is a good start but a 
> human-recognizable name would make logs much easier to read.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4722) StreamThread should allow customization of thread prefix

2017-02-01 Thread Steven Schlansker (JIRA)
Steven Schlansker created KAFKA-4722:


 Summary: StreamThread should allow customization of thread prefix
 Key: KAFKA-4722
 URL: https://issues.apache.org/jira/browse/KAFKA-4722
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Affects Versions: 0.10.1.1
Reporter: Steven Schlansker


StreamThread currently sets its name thusly:

{code}
super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
{code}

When you have multiple KStream / KTables in an application, it would be nice to 
customize the "StreamThread" prefix.  The id is a good start but a 
human-recognizable name would make logs much easier to read.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4721) KafkaStreams (and possibly others) should inherit Closeable

2017-02-01 Thread Steven Schlansker (JIRA)
Steven Schlansker created KAFKA-4721:


 Summary: KafkaStreams (and possibly others) should inherit 
Closeable
 Key: KAFKA-4721
 URL: https://issues.apache.org/jira/browse/KAFKA-4721
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Affects Versions: 0.10.1.1
Reporter: Steven Schlansker


KafkaStreams should inherit AutoCloseable or Closeable so that you can use 
try-with-resources:

{code}
try (KafkaStreams reader = storage.createStreams(builder)) {
reader.start();
stopCondition.join();
}
{code}




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4720) Add KStream.peek(ForeachAction<K,V>)

2017-02-01 Thread Steven Schlansker (JIRA)
Steven Schlansker created KAFKA-4720:


 Summary: Add KStream.peek(ForeachAction)
 Key: KAFKA-4720
 URL: https://issues.apache.org/jira/browse/KAFKA-4720
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Affects Versions: 0.10.1.1
Reporter: Steven Schlansker


Java's Stream provides a handy peek method that observes elements in the stream 
without transforming or filtering them.  While you can emulate this 
functionality with either a filter or map, peek provides potentially useful 
semantic information (doesn't modify the stream) and is much more concise.

Example usage: using Dropwizard Metrics to provide event counters

{code}
KStream s = ...;
s.map(this::mungeData)
 .peek((i, s) -> metrics.noteMungedEvent(i, s))
 .filter(this::hadProcessingError)
 .print();
{code}




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-3921) Periodic refresh of metadata causes spurious log messages

2016-06-30 Thread Steven Schlansker (JIRA)
Steven Schlansker created KAFKA-3921:


 Summary: Periodic refresh of metadata causes spurious log messages
 Key: KAFKA-3921
 URL: https://issues.apache.org/jira/browse/KAFKA-3921
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 0.9.0.1
Reporter: Steven Schlansker


Kafka cluster metadata has a configurable expiry period.  (I don't understand 
why this is -- cluster updates can happen at any time, and we have to pick 
those up quicker than every 10 minutes?  But this ticket isn't about that.)

When this interval expires, the ClientUtils class spins up a SyncProducer, 
which sends a special message to retrieve metadata.  The producer is then 
closed immediately after processing this message.

This causes the SyncProducer to log both a connection open and close at INFO 
level:

{code}
2016-06-30T17:50:19.408Z INFO <> [ProducerSendThread-central-buzzsaw-1-myhost] 
kafka.client.ClientUtils$ - Fetching metadata from broker 
BrokerEndPoint(2,broker-3.mycorp.com,9092) with correlation id 17188 for 1 
topic(s) Set(logstash)
2016-06-30T17:50:19.410Z INFO <> [ProducerSendThread-central-buzzsaw-1-myhost] 
kafka.producer.SyncProducer - Connected to broker-3.mycorp.com:9092 for 
producing
2016-06-30T17:50:19.411Z INFO <> [ProducerSendThread-central-buzzsaw-1-myhost] 
kafka.producer.SyncProducer - Disconnecting from broker-3.mycorp.com:9092
2016-06-30T17:50:19.411Z INFO <> [ProducerSendThread-central-buzzsaw-1-myhost] 
kafka.producer.SyncProducer - Disconnecting from broker-14.mycorp.com:9092
2016-06-30T17:50:19.411Z INFO <> [ProducerSendThread-central-buzzsaw-1-myhost] 
kafka.producer.SyncProducer - Disconnecting from 
broker-logkafka-13.mycorp.com:9092
2016-06-30T17:50:19.411Z INFO <> [ProducerSendThread-central-buzzsaw-1-myhost] 
kafka.producer.SyncProducer - Disconnecting from broker-12.mycorp.com:9092
2016-06-30T17:50:19.413Z INFO <> [ProducerSendThread-central-buzzsaw-1-myhost] 
kafka.producer.SyncProducer - Connected to broker-12.mycorp.com:9092 for 
producing
{code}

When you are reading the logs, this appears periodically.  We've had more than 
one administrator then think that the cluster is unhealthy, and client 
connections are getting dropped -- it's disconnecting from the broker so 
frequently!  What is wrong???  But in reality, it is just this harmless / 
expected metadata update.

Can we tweak the log levels so that the periodic background refresh does not 
log unless something goes wrong?  The log messages are misleading and easy to 
misinterpret.  I had to read the code pretty thoroughly to convince myself that 
these messages are actually harmless.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)