[jira] [Commented] (KAFKA-6378) NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper returns null

2018-01-10 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16321832#comment-16321832
 ] 

Matthias J. Sax commented on KAFKA-6378:


Can you open a PR? If yes, please update the JavaDocs accordingly and also add 
a corresponding test.

> NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper 
> returns null
> --
>
> Key: KAFKA-6378
> URL: https://issues.apache.org/jira/browse/KAFKA-6378
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andy Bryant
>
> On a Stream->GlobalKTable leftJoin if the KeyValueMapper returns null, the 
> stream fails with a NullPointerException (see stacktrace below). On Kafka 
> 0.11.0.0 the stream processes this successfully, calling the ValueJoiner with 
> the table value set to null.
> The use-case for this is joining a stream to a table containing reference 
> data where the stream foreign key may be null. There is no straight-forward 
> workaround in this case with Kafka 1.0.0 without having to resort to either 
> generating a key that will never match or branching the stream for records 
> that don't have the foreign key.
> Exception in thread "workshop-simple-example-client-StreamThread-1" 
> java.lang.NullPointerException
>   at java.base/java.util.Objects.requireNonNull(Objects.java:221)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:136)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:35)
>   at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:184)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:116)
>   at 
> org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.get(KTableSourceValueGetterSupplier.java:49)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:56)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
>   at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Issue Comment Deleted] (KAFKA-6441) FetchRequest populates buffer of size MinBytes, even if response is smaller

2018-01-10 Thread Ivan Babrou (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Babrou updated KAFKA-6441:
---
Comment: was deleted

(was: With 0.10.2.0 consumer API Sarama is able to get multiple messages in one 
FetchResponse.

It doesn't seem right to get only one with 0.11.0.0 API.)

> FetchRequest populates buffer of size MinBytes, even if response is smaller
> ---
>
> Key: KAFKA-6441
> URL: https://issues.apache.org/jira/browse/KAFKA-6441
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.1
>Reporter: Ivan Babrou
>
> We're using Sarama Go client as consumer, but I don't think it's relevant. 
> Producer is syslog-ng with Kafka output, I'm not quite sure which log format 
> Kafka itself is using, but I can assume 0.11.0.0, because that's what is set 
> in topic settings.
> Our FetchRequest has minSize = 16MB, maxSize = 64, maxWait = 500ms. For a 
> silly reason, Kafka decides to reply with at least minSize buffer with just 
> one 1KB log message. When Sarama was using older consumer API, everything was 
> okay. When we upgraded to 0.11.0.0 consumer API, consumer traffic for 
> 125Mbit/s topic spiked to 55000Mbit/s on the wire and consumer wasn't even 
> able to keep up.
> 1KB message in a 16MB buffer is 1,600,000% overhead.
> I don't think there's any valid reason to do this.
> It's also mildly annoying that there is no tag 0.11.0.1 in git, looking at 
> changes is harder than it should be.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6441) FetchRequest populates buffer of size MinBytes, even if response is smaller

2018-01-10 Thread Ivan Babrou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16321715#comment-16321715
 ] 

Ivan Babrou commented on KAFKA-6441:


With 0.10.2.0 consumer API Sarama is able to get multiple messages in one 
FetchResponse.

It doesn't seem right to get only one with 0.11.0.0 API.

> FetchRequest populates buffer of size MinBytes, even if response is smaller
> ---
>
> Key: KAFKA-6441
> URL: https://issues.apache.org/jira/browse/KAFKA-6441
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.1
>Reporter: Ivan Babrou
>
> We're using Sarama Go client as consumer, but I don't think it's relevant. 
> Producer is syslog-ng with Kafka output, I'm not quite sure which log format 
> Kafka itself is using, but I can assume 0.11.0.0, because that's what is set 
> in topic settings.
> Our FetchRequest has minSize = 16MB, maxSize = 64, maxWait = 500ms. For a 
> silly reason, Kafka decides to reply with at least minSize buffer with just 
> one 1KB log message. When Sarama was using older consumer API, everything was 
> okay. When we upgraded to 0.11.0.0 consumer API, consumer traffic for 
> 125Mbit/s topic spiked to 55000Mbit/s on the wire and consumer wasn't even 
> able to keep up.
> 1KB message in a 16MB buffer is 1,600,000% overhead.
> I don't think there's any valid reason to do this.
> It's also mildly annoying that there is no tag 0.11.0.1 in git, looking at 
> changes is harder than it should be.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6441) FetchRequest populates buffer of size MinBytes, even if response is smaller

2018-01-10 Thread Ivan Babrou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16321716#comment-16321716
 ] 

Ivan Babrou commented on KAFKA-6441:


With 0.10.2.0 consumer API Sarama is able to get multiple messages in one 
FetchResponse.

It doesn't seem right to get only one with 0.11.0.0 API.

> FetchRequest populates buffer of size MinBytes, even if response is smaller
> ---
>
> Key: KAFKA-6441
> URL: https://issues.apache.org/jira/browse/KAFKA-6441
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.1
>Reporter: Ivan Babrou
>
> We're using Sarama Go client as consumer, but I don't think it's relevant. 
> Producer is syslog-ng with Kafka output, I'm not quite sure which log format 
> Kafka itself is using, but I can assume 0.11.0.0, because that's what is set 
> in topic settings.
> Our FetchRequest has minSize = 16MB, maxSize = 64, maxWait = 500ms. For a 
> silly reason, Kafka decides to reply with at least minSize buffer with just 
> one 1KB log message. When Sarama was using older consumer API, everything was 
> okay. When we upgraded to 0.11.0.0 consumer API, consumer traffic for 
> 125Mbit/s topic spiked to 55000Mbit/s on the wire and consumer wasn't even 
> able to keep up.
> 1KB message in a 16MB buffer is 1,600,000% overhead.
> I don't think there's any valid reason to do this.
> It's also mildly annoying that there is no tag 0.11.0.1 in git, looking at 
> changes is harder than it should be.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6441) FetchRequest populates buffer of size MinBytes, even if response is smaller

2018-01-10 Thread Ivan Babrou (JIRA)
Ivan Babrou created KAFKA-6441:
--

 Summary: FetchRequest populates buffer of size MinBytes, even if 
response is smaller
 Key: KAFKA-6441
 URL: https://issues.apache.org/jira/browse/KAFKA-6441
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.11.0.1
Reporter: Ivan Babrou


We're using Sarama Go client as consumer, but I don't think it's relevant. 
Producer is syslog-ng with Kafka output, I'm not quite sure which log format 
Kafka itself is using, but I can assume 0.11.0.0, because that's what is set in 
topic settings.

Our FetchRequest has minSize = 16MB, maxSize = 64, maxWait = 500ms. For a silly 
reason, Kafka decides to reply with at least minSize buffer with just one 1KB 
log message. When Sarama was using older consumer API, everything was okay. 
When we upgraded to 0.11.0.0 consumer API, consumer traffic for 125Mbit/s topic 
spiked to 55000Mbit/s on the wire and consumer wasn't even able to keep up.

1KB message in a 16MB buffer is 1,600,000% overhead.

I don't think there's any valid reason to do this.

It's also mildly annoying that there is no tag 0.11.0.1 in git, looking at 
changes is harder than it should be.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6265) GlobalKTable missing #queryableStoreName()

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16321568#comment-16321568
 ] 

ASF GitHub Bot commented on KAFKA-6265:
---

ConcurrencyPractitioner opened a new pull request #4413: [KAFKA-6265] 
GlobalKTable missing #queryableStoreName()
URL: https://github.com/apache/kafka/pull/4413
 
 
   A spinoff of original pull request #4340 for resolving conflicts. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> GlobalKTable missing #queryableStoreName()
> --
>
> Key: KAFKA-6265
> URL: https://issues.apache.org/jira/browse/KAFKA-6265
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Antony Stubbs
>Assignee: Richard Yu
>  Labels: beginner, needs-kip, newbie
> Fix For: 1.1.0
>
>
> KTable has the nicely useful #queryableStoreName(), it seems to be missing 
> from GlobalKTable



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6265) GlobalKTable missing #queryableStoreName()

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16321551#comment-16321551
 ] 

ASF GitHub Bot commented on KAFKA-6265:
---

ConcurrencyPractitioner closed pull request #4412: [KAFKA-6265] GlobalKTable 
missing #queryableStoreName()
URL: https://github.com/apache/kafka/pull/4412
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/streams/developer-guide.html 
b/docs/streams/developer-guide.html
index 5730f5330e1..20d01f2c148 100644
--- a/docs/streams/developer-guide.html
+++ b/docs/streams/developer-guide.html
@@ -2294,22 +2294,22 @@ 
 
 
-  // Get the window store named "CountsWindowStore"
-  ReadOnlyWindowStoreString, Long windowStore =
-  streams.store("CountsWindowStore", 
QueryableStoreTypes.windowStore());
-
-  // Fetch values for the key "world" for all of the windows available 
in this application instance.
-  // To get *all* available windows we fetch windows from the 
beginning of time until now.
-  long timeFrom = 0; // beginning of time = oldest available
-  long timeTo = System.currentTimeMillis(); // now (in processing-time)
-  WindowStoreIteratorLong iterator = 
windowStore.fetch("world", timeFrom, timeTo);
-  while (iterator.hasNext()) {
+ // Get the window store named "CountsWindowStore"
+ ReadOnlyWindowStoreString, Long windowStore =
+ streams.store("CountsWindowStore", 
QueryableStoreTypes.windowStore());
+
+ // Fetch values for the key "world" for all of the windows available 
in this application instance.
+ // To get *all* available windows we fetch windows from the beginning 
of time until now.
+ long timeFrom = 0; // beginning of time = oldest available
+ long timeTo = System.currentTimeMillis(); // now (in processing-time)
+ WindowStoreIteratorLong iterator = windowStore.fetch("world", 
timeFrom, timeTo);
+ while (iterator.hasNext()) {
 KeyValueLong, Long next = iterator.next();
 long windowTimestamp = next.key;
 System.out.println("Count of 'world' @ time " + windowTimestamp + 
" is " + next.value);
-  }
-  iterator.close();
-
+ }
+ iterator.close();
+
 
 Querying 
local custom state stores
 
@@ -3023,4 +3023,4 @@ Executing Your Kafka Streams
   // Display docs subnav items
   $('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
 });
-
\ No newline at end of file
+
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 297405802ca..cd5ad433159 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -44,6 +44,15 @@ Upgrade Guide  API Changes
 See below a complete list of 
0.10.1 API changes that allow you to advance your application and/or simplify 
your code base, including the usage of new features.
 
 
+
+Streams API changes in 1.1.0
+
+   New method in GlobalKTable
+
+
+A method has been provided such that it will return the store name 
associated with the GlobalKTable or null if the store 
name is non-queryable. 
+
+
 Streams API changes in 1.0.0
 
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
index 72286c20529..e58f67fc5b3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
@@ -67,4 +67,10 @@
  */
 @InterfaceStability.Evolving
 public interface GlobalKTable {
+/**
+ * Get the name of the local state store that can be used to query this 
{@code GlobalKTable}.
+ *
+ * @return the underlying state store name, or {@code null} if this {@code 
GlobalKTable} cannot be queried.
+ */
+String queryableStoreName();
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GlobalKTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GlobalKTableImpl.java
index 34e23752444..8fcdfed1e52 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GlobalKTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GlobalKTableImpl.java
@@ -21,13 +21,29 @@
 public class GlobalKTableImpl implements GlobalKTable {
 
 private final KTableValueGetterSupplier valueGetterSupplier;
+private final boolean queryable;
 
 public GlobalKTableImpl(final KTableValueGetterSupplier 

[jira] [Commented] (KAFKA-6378) NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper returns null

2018-01-10 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16321393#comment-16321393
 ] 

Ted Yu commented on KAFKA-6378:
---

Here is the two line change :
{code}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
 b/streams/src/main/java/org/apache/kafka/streams/kstream/internals
index bac930d..dd7877b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
@@ -53,7 +53,8 @@ class KStreamKTableJoinProcessor extends 
AbstractProcessor NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper 
> returns null
> --
>
> Key: KAFKA-6378
> URL: https://issues.apache.org/jira/browse/KAFKA-6378
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andy Bryant
>
> On a Stream->GlobalKTable leftJoin if the KeyValueMapper returns null, the 
> stream fails with a NullPointerException (see stacktrace below). On Kafka 
> 0.11.0.0 the stream processes this successfully, calling the ValueJoiner with 
> the table value set to null.
> The use-case for this is joining a stream to a table containing reference 
> data where the stream foreign key may be null. There is no straight-forward 
> workaround in this case with Kafka 1.0.0 without having to resort to either 
> generating a key that will never match or branching the stream for records 
> that don't have the foreign key.
> Exception in thread "workshop-simple-example-client-StreamThread-1" 
> java.lang.NullPointerException
>   at java.base/java.util.Objects.requireNonNull(Objects.java:221)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:136)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:35)
>   at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:184)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:116)
>   at 
> org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.get(KTableSourceValueGetterSupplier.java:49)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:56)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
>   at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6378) NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper returns null

2018-01-10 Thread Andy Bryant (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16321374#comment-16321374
 ] 

Andy Bryant commented on KAFKA-6378:


A sentinel value is ok where only a subset of the available values for a type 
are valid, but it does seem messy to have to convert {{null}} values to the 
sentinel before the join then back to {{null}} again in the merge function 
afterwards.

Also it doesn't cater for the case where you can't pick a sentinel because all 
values of a type are valid.

Since as Matthias pointed out {{null}} can never be a valid key explicitly 
calling it out as indicating no match in the docs and updated the code so it 
doesn't crash (a two line change by the looks) seems like a nice clean approach 
to me.


> NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper 
> returns null
> --
>
> Key: KAFKA-6378
> URL: https://issues.apache.org/jira/browse/KAFKA-6378
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andy Bryant
>
> On a Stream->GlobalKTable leftJoin if the KeyValueMapper returns null, the 
> stream fails with a NullPointerException (see stacktrace below). On Kafka 
> 0.11.0.0 the stream processes this successfully, calling the ValueJoiner with 
> the table value set to null.
> The use-case for this is joining a stream to a table containing reference 
> data where the stream foreign key may be null. There is no straight-forward 
> workaround in this case with Kafka 1.0.0 without having to resort to either 
> generating a key that will never match or branching the stream for records 
> that don't have the foreign key.
> Exception in thread "workshop-simple-example-client-StreamThread-1" 
> java.lang.NullPointerException
>   at java.base/java.util.Objects.requireNonNull(Objects.java:221)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:136)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:35)
>   at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:184)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:116)
>   at 
> org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.get(KTableSourceValueGetterSupplier.java:49)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:56)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
>   at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-6205) Have State Stores Restore Before Initializing Toplogy

2018-01-10 Thread Bill Bejeck (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck reassigned KAFKA-6205:
--

Assignee: Bill Bejeck

> Have State Stores Restore Before Initializing Toplogy
> -
>
> Key: KAFKA-6205
> URL: https://issues.apache.org/jira/browse/KAFKA-6205
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.0, 0.11.0.2
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
> Fix For: 1.0.1, 0.11.0.3
>
>
> Streams should restore state stores (if needed) before initializing the 
> topology.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-4969) State-store workload-aware StreamsPartitionAssignor

2018-01-10 Thread Bill Bejeck (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck reassigned KAFKA-4969:
--

Assignee: Bill Bejeck

> State-store workload-aware StreamsPartitionAssignor
> ---
>
> Key: KAFKA-4969
> URL: https://issues.apache.org/jira/browse/KAFKA-4969
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Bill Bejeck
>
> Currently, {{StreamPartitionsAssigner}} does not distinguish different 
> "types" of tasks. For example, task can be stateless of have one or multiple 
> stores.
> This can lead to an suboptimal task placement: assume there are 2 stateless 
> and 2 stateful tasks and the app is running with 2 instances. To share the 
> "store load" it would be good to place one stateless and one stateful task 
> per instance. Right now, there is no guarantee about this, and it can happen, 
> that one instance processed both stateless tasks while the other processes 
> both stateful tasks.
> We should improve {{StreamPartitionAssignor}} and introduce "task types" 
> including a cost model for task placement. We should consider the following 
> parameters:
>  - number of stores
>  - number of sources/sinks
>  - number of processors
>  - regular task vs standby task
> This improvement should be backed by a design document in the project wiki 
> (no KIP required though) as it's a fairly complex change.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4969) State-store workload-aware StreamsPartitionAssignor

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16321183#comment-16321183
 ] 

ASF GitHub Bot commented on KAFKA-4969:
---

bbejeck opened a new pull request #4410: KAFKA-4969: attempt to evenly 
distribute load of tasks
URL: https://github.com/apache/kafka/pull/4410
 
 
   This PR is an initial attempt to evenly distribute tasks with heavy 
processing across clients using a somewhat naive approach.
   
   The rationale is by making sure each task is not comprised entirely of the 
same `topicGroupId`s, 
   then if there is one sub-topology doing heavy processing and another 
sub-topology that is relatively light, the processing load is somewhat evenly 
distributed.
   
   This process only looks at active tasks; standby tasks are not given this 
consideration as we can end up in a state where clients have the same task 
assignments i.e [aT1, sT2] [aT2, sT1].
   
   We plan to do a follow-on task at a later date where we weigh tasks with 
state stores to
   distribute tasks with state stores evenly.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> State-store workload-aware StreamsPartitionAssignor
> ---
>
> Key: KAFKA-4969
> URL: https://issues.apache.org/jira/browse/KAFKA-4969
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>
> Currently, {{StreamPartitionsAssigner}} does not distinguish different 
> "types" of tasks. For example, task can be stateless of have one or multiple 
> stores.
> This can lead to an suboptimal task placement: assume there are 2 stateless 
> and 2 stateful tasks and the app is running with 2 instances. To share the 
> "store load" it would be good to place one stateless and one stateful task 
> per instance. Right now, there is no guarantee about this, and it can happen, 
> that one instance processed both stateless tasks while the other processes 
> both stateful tasks.
> We should improve {{StreamPartitionAssignor}} and introduce "task types" 
> including a cost model for task placement. We should consider the following 
> parameters:
>  - number of stores
>  - number of sources/sinks
>  - number of processors
>  - regular task vs standby task
> This improvement should be backed by a design document in the project wiki 
> (no KIP required though) as it's a fairly complex change.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4711) Change Default unclean.leader.election.enabled from True to False (KIP-106)

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16321146#comment-16321146
 ] 

ASF GitHub Bot commented on KAFKA-4711:
---

junrao closed pull request #3567: KAFKA-4711: fix docs 
onunclean.leader.election.enable default
URL: https://github.com/apache/kafka/pull/3567
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/design.html b/docs/design.html
index 564df386db7..69d1941effd 100644
--- a/docs/design.html
+++ b/docs/design.html
@@ -238,8 +238,8 @@ 4.6 Message 
Delivery Semantics
 can fail, cases where there are multiple consumer processes, or cases 
where data written to disk can be lost).
 
 Kafka's semantics are straight-forward. When publishing a message we have 
a notion of the message being "committed" to the log. Once a published message 
is committed it will not be lost as long as one broker that
-replicates the partition to which this message was written remains 
"alive". The definition of committed message, alive partition as well as a 
description of which types of failures we attempt to handle will be 
-described in more detail in the next section. For now let's assume a 
perfect, lossless broker and try to understand the guarantees to the producer 
and consumer. If a producer attempts to publish a message and 
+replicates the partition to which this message was written remains 
"alive". The definition of committed message, alive partition as well as a 
description of which types of failures we attempt to handle will be
+described in more detail in the next section. For now let's assume a 
perfect, lossless broker and try to understand the guarantees to the producer 
and consumer. If a producer attempts to publish a message and
 experiences a network error it cannot be sure if this error happened 
before or after the message was committed. This is similar to the semantics of 
inserting into a database table with an autogenerated key.
 
 Prior to 0.11.0.0, if a producer failed to receive a response indicating 
that a message was committed, it had little choice but to resend the message. 
This provides at-least-once delivery semantics since the
@@ -309,11 +309,11 @@ 4.7 
Replication
 handle so-called "Byzantine" failures in which nodes produce arbitrary or 
malicious responses (perhaps due to bugs or foul play).
 
 We can now more precisely define that a message is considered committed 
when all in sync replicas for that partition have applied it to their log.
-Only committed messages are ever given out to the consumer. This means 
that the consumer need not worry about potentially seeing a message that could 
be lost if the leader fails. Producers, on the other hand, 
-have the option of either waiting for the message to be committed or not, 
depending on their preference for tradeoff between latency and durability. This 
preference is controlled by the acks setting that the 
+Only committed messages are ever given out to the consumer. This means 
that the consumer need not worry about potentially seeing a message that could 
be lost if the leader fails. Producers, on the other hand,
+have the option of either waiting for the message to be committed or not, 
depending on their preference for tradeoff between latency and durability. This 
preference is controlled by the acks setting that the
 producer uses.
 Note that topics have a setting for the "minimum number" of in-sync 
replicas that is checked when the producer requests acknowledgment that a 
message
-has been written to the full set of in-sync replicas. If a less stringent 
acknowledgement is requested by the producer, then the message can be 
committed, and consumed, 
+has been written to the full set of in-sync replicas. If a less stringent 
acknowledgement is requested by the producer, then the message can be 
committed, and consumed,
 even if the number of in-sync replicas is lower than the minimum (e.g. it 
can be as low as just the leader).
 
 The guarantee that Kafka offers is that a committed message will not be 
lost, as long as there is at least one in sync replica alive, at all times.
@@ -384,8 +384,8 @@ Unclean leader ele
 
 This is a simple tradeoff between availability and consistency. If we wait 
for replicas in the ISR, then we will remain unavailable as long as those 
replicas are down. If such replicas were destroyed or their data
 was lost, then we are permanently down. If, on the other hand, a 
non-in-sync replica comes back to life and we allow it to become leader, then 
its log becomes the source of truth even though it is not guaranteed to
-have every 

[jira] [Updated] (KAFKA-6440) Expose Connect leader via REST

2018-01-10 Thread Ryan P (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan P updated KAFKA-6440:
--
Description: 
[KIP-196|https://cwiki.apache.org/confluence/display/KAFKA/KIP-196%3A+Add+metrics+to+Kafka+Connect+framework]
 adds a metric to expose the current leader of a connect cluster. It would be 
helpful to make this information available via REST, along with a list of all 
the clusters members.   (was: 
[KIP-196|https://cwiki.apache.org/confluence/display/KAFKA/KIP-196%3A+Add+metrics+to+Kafka+Connect+framework]
 adds a metric to expose the current leader of a connect cluster. It would be 
helpful to make this information available via REST as well as it would not 
require the use of a JMX client. 

In ad)

> Expose Connect leader via REST
> --
>
> Key: KAFKA-6440
> URL: https://issues.apache.org/jira/browse/KAFKA-6440
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Ryan P
>Priority: Minor
>  Labels: needs-kip
>
> [KIP-196|https://cwiki.apache.org/confluence/display/KAFKA/KIP-196%3A+Add+metrics+to+Kafka+Connect+framework]
>  adds a metric to expose the current leader of a connect cluster. It would be 
> helpful to make this information available via REST, along with a list of all 
> the clusters members. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6440) Expose Connect leader via REST

2018-01-10 Thread Ryan P (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan P updated KAFKA-6440:
--
Description: 
[KIP-196|https://cwiki.apache.org/confluence/display/KAFKA/KIP-196%3A+Add+metrics+to+Kafka+Connect+framework]
 adds a metric to expose the current leader of a connect cluster. It would be 
helpful to make this information available via REST as well as it would not 
require the use of a JMX client. 

In ad

  
was:[KIP-196|https://cwiki.apache.org/confluence/display/KAFKA/KIP-196%3A+Add+metrics+to+Kafka+Connect+framework]
 adds a metric to expose the current leader of a connect cluster. It would be 
helpful to make this information available via REST as well as it would not 
require the use of a JMX client. 


> Expose Connect leader via REST
> --
>
> Key: KAFKA-6440
> URL: https://issues.apache.org/jira/browse/KAFKA-6440
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Ryan P
>Priority: Minor
>  Labels: needs-kip
>
> [KIP-196|https://cwiki.apache.org/confluence/display/KAFKA/KIP-196%3A+Add+metrics+to+Kafka+Connect+framework]
>  adds a metric to expose the current leader of a connect cluster. It would be 
> helpful to make this information available via REST as well as it would not 
> require the use of a JMX client. 
> In ad



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6440) Expose Connect leader via REST

2018-01-10 Thread Ryan P (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16321028#comment-16321028
 ] 

Ryan P commented on KAFKA-6440:
---

[~rhauch] +1, editing description to accommodate this 

> Expose Connect leader via REST
> --
>
> Key: KAFKA-6440
> URL: https://issues.apache.org/jira/browse/KAFKA-6440
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Ryan P
>Priority: Minor
>  Labels: needs-kip
>
> [KIP-196|https://cwiki.apache.org/confluence/display/KAFKA/KIP-196%3A+Add+metrics+to+Kafka+Connect+framework]
>  adds a metric to expose the current leader of a connect cluster. It would be 
> helpful to make this information available via REST as well as it would not 
> require the use of a JMX client. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6440) Expose Connect leader via REST

2018-01-10 Thread Ryan P (JIRA)
Ryan P created KAFKA-6440:
-

 Summary: Expose Connect leader via REST
 Key: KAFKA-6440
 URL: https://issues.apache.org/jira/browse/KAFKA-6440
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Affects Versions: 1.0.0
Reporter: Ryan P
Priority: Minor


[KIP-196|https://cwiki.apache.org/confluence/display/KAFKA/KIP-196%3A+Add+metrics+to+Kafka+Connect+framework]
 adds a metric to expose the current leader of a connect cluster. It would be 
helpful to make this information available via REST as well as it would not 
require the use of a JMX client. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6440) Expose Connect leader via REST

2018-01-10 Thread Randall Hauch (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-6440:
-
Labels: needs-kip  (was: )

> Expose Connect leader via REST
> --
>
> Key: KAFKA-6440
> URL: https://issues.apache.org/jira/browse/KAFKA-6440
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Ryan P
>Priority: Minor
>  Labels: needs-kip
>
> [KIP-196|https://cwiki.apache.org/confluence/display/KAFKA/KIP-196%3A+Add+metrics+to+Kafka+Connect+framework]
>  adds a metric to expose the current leader of a connect cluster. It would be 
> helpful to make this information available via REST as well as it would not 
> require the use of a JMX client. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6435) Application Reset Tool might delete incorrect internal topics

2018-01-10 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-6435:
-
Labels: bug  (was: )

> Application Reset Tool might delete incorrect internal topics
> -
>
> Key: KAFKA-6435
> URL: https://issues.apache.org/jira/browse/KAFKA-6435
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, tools
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>  Labels: bug
>
> The streams application reset tool, deletes all topic that start with 
> {{-}}.
> If people have two versions of the same application and name them {{"app"}} 
> and {{"app-v2"}}, resetting {{"app"}} would also delete the internal topics 
> of {{"app-v2"}}.
> We either need to disallow the dash in the application ID, or improve the 
> topic identification logic in the reset tool to fix this.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6412) Improve synchronization in CachingKeyValueStore methods

2018-01-10 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-6412:
-
Labels:   (was: bug)

> Improve synchronization in CachingKeyValueStore methods
> ---
>
> Key: KAFKA-6412
> URL: https://issues.apache.org/jira/browse/KAFKA-6412
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ted Yu
>Assignee: Ted Yu
> Fix For: 1.1.0
>
> Attachments: 6412-jmh.v1.txt, k-6412.v1.txt
>
>
> Currently CachingKeyValueStore methods are synchronized at method level.
> It seems we can use read lock for getter and write lock for put / delete 
> methods.
> For getInternal(), if the underlying thread is streamThread, the 
> getInternal() may trigger eviction. This can be handled by obtaining write 
> lock at the beginning of the method for streamThread.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6412) Improve synchronization in CachingKeyValueStore methods

2018-01-10 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-6412:
-
Labels: bug  (was: )

> Improve synchronization in CachingKeyValueStore methods
> ---
>
> Key: KAFKA-6412
> URL: https://issues.apache.org/jira/browse/KAFKA-6412
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ted Yu
>Assignee: Ted Yu
> Fix For: 1.1.0
>
> Attachments: 6412-jmh.v1.txt, k-6412.v1.txt
>
>
> Currently CachingKeyValueStore methods are synchronized at method level.
> It seems we can use read lock for getter and write lock for put / delete 
> methods.
> For getInternal(), if the underlying thread is streamThread, the 
> getInternal() may trigger eviction. This can be handled by obtaining write 
> lock at the beginning of the method for streamThread.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6437) Streams does not warn about missing input topics, but hangs

2018-01-10 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-6437:
-
Labels: newbie  (was: )

> Streams does not warn about missing input topics, but hangs
> ---
>
> Key: KAFKA-6437
> URL: https://issues.apache.org/jira/browse/KAFKA-6437
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.0
> Environment: Single client on single node broker
>Reporter: Chris Schwarzfischer
>Priority: Minor
>  Labels: newbie
>
> *Case*
> Streams application with two input topics being used for a left join.
> When the left side topic is missing upon starting the streams application, it 
> hangs "in the middle" of the topology (at …9, see below). Only parts of 
> the intermediate topics are created (up to …9)
> When the missing input topic is created, the streams application resumes 
> processing.
> {noformat}
> Topology:
> StreamsTask taskId: 2_0
>   ProcessorTopology:
>   KSTREAM-SOURCE-11:
>   topics: 
> [mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition]
>   children:   [KTABLE-AGGREGATE-12]
>   KTABLE-AGGREGATE-12:
>   states: 
> [KTABLE-AGGREGATE-STATE-STORE-09]
>   children:   [KTABLE-TOSTREAM-20]
>   KTABLE-TOSTREAM-20:
>   children:   [KSTREAM-SINK-21]
>   KSTREAM-SINK-21:
>   topic:  data_udr_month_customer_aggregration
>   KSTREAM-SOURCE-17:
>   topics: 
> [mystreams_app-KSTREAM-MAP-14-repartition]
>   children:   [KSTREAM-LEFTJOIN-18]
>   KSTREAM-LEFTJOIN-18:
>   states: 
> [KTABLE-AGGREGATE-STATE-STORE-09]
>   children:   [KSTREAM-SINK-19]
>   KSTREAM-SINK-19:
>   topic:  data_UDR_joined
> Partitions [mystreams_app-KSTREAM-MAP-14-repartition-0, 
> mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition-0]
> {noformat}
> *Why this matters*
> The applications does quite a lot of preprocessing before joining with the 
> missing input topic. This preprocessing won't happen without the topic, 
> creating a huge backlog of data.
> *Fix*
> Issue an `warn` or `error` level message at start to inform about the missing 
> topic and it's consequences.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6412) Improve synchronization in CachingKeyValueStore methods

2018-01-10 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16320898#comment-16320898
 ] 

Guozhang Wang commented on KAFKA-6412:
--

[~tedyu] way to start 2018 indeed :)

> Improve synchronization in CachingKeyValueStore methods
> ---
>
> Key: KAFKA-6412
> URL: https://issues.apache.org/jira/browse/KAFKA-6412
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ted Yu
>Assignee: Ted Yu
> Fix For: 1.1.0
>
> Attachments: 6412-jmh.v1.txt, k-6412.v1.txt
>
>
> Currently CachingKeyValueStore methods are synchronized at method level.
> It seems we can use read lock for getter and write lock for put / delete 
> methods.
> For getInternal(), if the underlying thread is streamThread, the 
> getInternal() may trigger eviction. This can be handled by obtaining write 
> lock at the beginning of the method for streamThread.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-6412) Improve synchronization in CachingKeyValueStore methods

2018-01-10 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reassigned KAFKA-6412:


Assignee: Ted Yu

> Improve synchronization in CachingKeyValueStore methods
> ---
>
> Key: KAFKA-6412
> URL: https://issues.apache.org/jira/browse/KAFKA-6412
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ted Yu
>Assignee: Ted Yu
> Fix For: 1.1.0
>
> Attachments: 6412-jmh.v1.txt, k-6412.v1.txt
>
>
> Currently CachingKeyValueStore methods are synchronized at method level.
> It seems we can use read lock for getter and write lock for put / delete 
> methods.
> For getInternal(), if the underlying thread is streamThread, the 
> getInternal() may trigger eviction. This can be handled by obtaining write 
> lock at the beginning of the method for streamThread.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6398) Non-aggregation KTable generation operator does not construct value getter correctly

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16320888#comment-16320888
 ] 

ASF GitHub Bot commented on KAFKA-6398:
---

guozhangwang closed pull request #4384: KAFKA-6398: fix KTable.filter that does 
not include its parent's queryable storename
URL: https://github.com/apache/kafka/pull/4384
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 8c79decbb6f..3bc6f4b3474 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -155,8 +155,10 @@ String internalStoreName() {
 builder.internalTopologyBuilder.addProcessor(name, processorSupplier, 
this.name);
 if (storeSupplier != null) {
 builder.internalTopologyBuilder.addStateStore(storeSupplier, name);
+return new KTableImpl<>(builder, name, processorSupplier, 
this.keySerde, this.valSerde, sourceNodes, internalStoreName, true);
+} else {
+return new KTableImpl<>(builder, name, processorSupplier, 
sourceNodes, this.queryableStoreName, false);
 }
-return new KTableImpl<>(builder, name, processorSupplier, 
this.keySerde, this.valSerde, sourceNodes, internalStoreName, internalStoreName 
!= null);
 }
 
 private KTable doFilter(final Predicate 
predicate,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 39ea44f1bfd..39010022a0b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -456,6 +456,7 @@ public final void addSource(final Topology.AutoOffsetReset 
offsetReset,
 }
 
 for (final String predecessor : predecessorNames) {
+Objects.requireNonNull(predecessor, "predecessor name can't be 
null");
 if (predecessor.equals(name)) {
 throw new TopologyException("Processor " + name + " cannot be 
a predecessor of itself.");
 }
@@ -483,6 +484,7 @@ public final void addProcessor(final String name,
 }
 
 for (final String predecessor : predecessorNames) {
+Objects.requireNonNull(predecessor, "predecessor name must not be 
null");
 if (predecessor.equals(name)) {
 throw new TopologyException("Processor " + name + " cannot be 
a predecessor of itself.");
 }
@@ -508,6 +510,7 @@ public final void addStateStore(final 
org.apache.kafka.streams.processor.StateSt
 
 if (processorNames != null) {
 for (final String processorName : processorNames) {
+Objects.requireNonNull(processorName, "processor name must not 
be null");
 connectProcessorAndStateStore(processorName, supplier.name());
 }
 }
@@ -524,6 +527,7 @@ public final void addStateStore(final StoreBuilder 
storeBuilder,
 
 if (processorNames != null) {
 for (final String processorName : processorNames) {
+Objects.requireNonNull(processorName, "processor name must not 
be null");
 connectProcessorAndStateStore(processorName, 
storeBuilder.name());
 }
 }
@@ -602,11 +606,12 @@ private void validateTopicNotAlreadyRegistered(final 
String topic) {
 public final void connectProcessorAndStateStores(final String 
processorName,
  final String... 
stateStoreNames) {
 Objects.requireNonNull(processorName, "processorName can't be null");
-Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be 
null");
+Objects.requireNonNull(stateStoreNames, "state store list must not be 
null");
 if (stateStoreNames.length == 0) {
 throw new TopologyException("Must provide at least one state store 
name.");
 }
 for (final String stateStoreName : stateStoreNames) {
+Objects.requireNonNull(stateStoreName, "state store name must not 
be null");
 connectProcessorAndStateStore(processorName, stateStoreName);
 }
 }
@@ -627,6 +632,7 @@ public final void connectProcessors(final String... 
processorNames) {
 }
 
 for (final String processorName : 

[jira] [Commented] (KAFKA-6438) NSEE while concurrently creating and deleting a topic

2018-01-10 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16320694#comment-16320694
 ] 

Ted Yu commented on KAFKA-6438:
---

Two maps are accessed in updateMetadataRequestPartitionInfo(): 
controllerContext.partitionLeadershipInfo and 
controllerContext.partitionReplicaAssignment .

Looks like we should check existence of {{ partition }} before proceeding.

> NSEE while concurrently creating and deleting a topic
> -
>
> Key: KAFKA-6438
> URL: https://issues.apache.org/jira/browse/KAFKA-6438
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 1.0.0
> Environment: kafka_2.11-1.0.0.jar
> OpenJDK Runtime Environment (build 1.8.0_102-b14), OpenJDK 64-Bit Server VM 
> (build 25.102-b14, mixed mode)
> CentOS Linux release 7.3.1611 (Core)
>Reporter: Adam Kotwasinski
>  Labels: reliability
> Fix For: 1.1.0
>
>
> It appears that deleting a topic and creating it at the same time can cause 
> NSEE, what later results in a forced controller shutdown.
> Most probably topics are being created because consumers/producers are still 
> active (yes, this means the deletion is happening blindly).
> The main problem here (for me) is the controller switch, the data loss and 
> following unclean election is acceptable (as we admit to deleting blindly).
> Environment description:
> 20 kafka brokers
> 80k partitions (20k topics 4partitions each)
> 3 node ZK
> Incident:
> {code:java}
> [2018-01-09 11:19:05,912] INFO [Topic Deletion Manager 6], Partition deletion 
> callback for mytopic-2,mytopic-0,mytopic-1,mytopic-3 
> (kafka.controller.TopicDeletionManager)
> [2018-01-09 11:19:06,237] INFO [Controller id=6] New leader and ISR for 
> partition mytopic-0 is {"leader":-1,"leader_epoch":1,"isr":[]} 
> (kafka.controller.KafkaController)
> [2018-01-09 11:19:06,412] INFO [Topic Deletion Manager 6], Deletion for 
> replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
> topic mytopic in progress (kafka.controller.TopicDeletionManager)
> [2018-01-09 11:19:07,218] INFO [Topic Deletion Manager 6], Deletion for 
> replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
> topic mytopic in progress (kafka.controller.TopicDeletionManager)
> [2018-01-09 11:19:07,304] INFO [Topic Deletion Manager 6], Deletion for 
> replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
> topic mytopic in progress (kafka.controller.TopicDeletionManager)
> [2018-01-09 11:19:07,383] INFO [Topic Deletion Manager 6], Deletion for 
> replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
> topic mytopic in progress (kafka.controller.TopicDeletionManager)
> [2018-01-09 11:19:07,510] INFO [Topic Deletion Manager 6], Deletion for 
> replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
> topic mytopic in progress (kafka.controller.TopicDeletionManager)
> [2018-01-09 11:19:07,661] INFO [Topic Deletion Manager 6], Deletion for 
> replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
> topic mytopic in progress (kafka.controller.TopicDeletionManager)
> [2018-01-09 11:19:07,728] INFO [Topic Deletion Manager 6], Deletion for 
> replicas 9,10,11 for partition mytopic-0,mytopic-1,mytopic-2 of topic mytopic 
> in progress (kafka.controller.TopicDeletionManager)
> [2018-01-09 11:19:07,924] INFO [PartitionStateMachine controllerId=6] 
> Invoking state change to OfflinePartition for partitions 
> mytopic-2,mytopic-0,mytopic-1,mytopic-3 
> (kafka.controller.PartitionStateMachine)
> [2018-01-09 11:19:07,924] INFO [PartitionStateMachine controllerId=6] 
> Invoking state change to NonExistentPartition for partitions 
> mytopic-2,mytopic-0,mytopic-1,mytopic-3 
> (kafka.controller.PartitionStateMachine)
> [2018-01-09 11:19:08,592] INFO [Controller id=6] New topics: [Set(mytopic, 
> other, other2)], deleted topics: [Set()], new partition replica assignment 
> [Map(other-0 -> Vector(8), mytopic-2 -> Vector(6), mytopic-0 -> Vector(4), 
> other-2 -> Vector(10), mytopic-1 -> Vector(5), mytopic-3 -> Vector(7), 
> other-1 -> Vector(9), other-3 -> Vector(11))] 
> (kafka.controller.KafkaController)
> [2018-01-09 11:19:08,593] INFO [Controller id=6] New topic creation callback 
> for other-0,mytopic-2,mytopic-0,other-2,mytopic-1,mytopic-3,other-1,other-3 
> (kafka.controller.KafkaController)
> [2018-01-09 11:19:08,596] INFO [Controller id=6] New partition creation 
> callback for 
> other-0,mytopic-2,mytopic-0,other-2,mytopic-1,mytopic-3,other-1,other-3 
> (kafka.controller.KafkaController)
> [2018-01-09 11:19:08,596] INFO [PartitionStateMachine controllerId=6] 
> Invoking state change to NewPartition for partitions 
> 

[jira] [Commented] (KAFKA-6437) Streams does not warn about missing input topics, but hangs

2018-01-10 Thread Chris Schwarzfischer (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16320675#comment-16320675
 ] 

Chris Schwarzfischer commented on KAFKA-6437:
-

Yep, I know it's by design and that doesn't need to change, of course.

"It hangs in the middle" means, that the application is actually starting and 
processing data up to some intermediate topic. This makes it easy to overlook 
that there are topics missing that prevent the application from running 
correctly.
It would make it a lot easier to spot this error if there was an error 
messaging saying that the topic is missing instead of simply switching to 
"RUNNING" as if everything was ok…


> Streams does not warn about missing input topics, but hangs
> ---
>
> Key: KAFKA-6437
> URL: https://issues.apache.org/jira/browse/KAFKA-6437
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.0
> Environment: Single client on single node broker
>Reporter: Chris Schwarzfischer
>Priority: Minor
>
> *Case*
> Streams application with two input topics being used for a left join.
> When the left side topic is missing upon starting the streams application, it 
> hangs "in the middle" of the topology (at …9, see below). Only parts of 
> the intermediate topics are created (up to …9)
> When the missing input topic is created, the streams application resumes 
> processing.
> {noformat}
> Topology:
> StreamsTask taskId: 2_0
>   ProcessorTopology:
>   KSTREAM-SOURCE-11:
>   topics: 
> [mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition]
>   children:   [KTABLE-AGGREGATE-12]
>   KTABLE-AGGREGATE-12:
>   states: 
> [KTABLE-AGGREGATE-STATE-STORE-09]
>   children:   [KTABLE-TOSTREAM-20]
>   KTABLE-TOSTREAM-20:
>   children:   [KSTREAM-SINK-21]
>   KSTREAM-SINK-21:
>   topic:  data_udr_month_customer_aggregration
>   KSTREAM-SOURCE-17:
>   topics: 
> [mystreams_app-KSTREAM-MAP-14-repartition]
>   children:   [KSTREAM-LEFTJOIN-18]
>   KSTREAM-LEFTJOIN-18:
>   states: 
> [KTABLE-AGGREGATE-STATE-STORE-09]
>   children:   [KSTREAM-SINK-19]
>   KSTREAM-SINK-19:
>   topic:  data_UDR_joined
> Partitions [mystreams_app-KSTREAM-MAP-14-repartition-0, 
> mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition-0]
> {noformat}
> *Why this matters*
> The applications does quite a lot of preprocessing before joining with the 
> missing input topic. This preprocessing won't happen without the topic, 
> creating a huge backlog of data.
> *Fix*
> Issue an `warn` or `error` level message at start to inform about the missing 
> topic and it's consequences.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6438) NSEE while concurrently creating and deleting a topic

2018-01-10 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-6438:
---
Fix Version/s: 1.1.0

> NSEE while concurrently creating and deleting a topic
> -
>
> Key: KAFKA-6438
> URL: https://issues.apache.org/jira/browse/KAFKA-6438
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 1.0.0
> Environment: kafka_2.11-1.0.0.jar
> OpenJDK Runtime Environment (build 1.8.0_102-b14), OpenJDK 64-Bit Server VM 
> (build 25.102-b14, mixed mode)
> CentOS Linux release 7.3.1611 (Core)
>Reporter: Adam Kotwasinski
>  Labels: reliability
> Fix For: 1.1.0
>
>
> It appears that deleting a topic and creating it at the same time can cause 
> NSEE, what later results in a forced controller shutdown.
> Most probably topics are being created because consumers/producers are still 
> active (yes, this means the deletion is happening blindly).
> The main problem here (for me) is the controller switch, the data loss and 
> following unclean election is acceptable (as we admit to deleting blindly).
> Environment description:
> 20 kafka brokers
> 80k partitions (20k topics 4partitions each)
> 3 node ZK
> Incident:
> {code:java}
> [2018-01-09 11:19:05,912] INFO [Topic Deletion Manager 6], Partition deletion 
> callback for mytopic-2,mytopic-0,mytopic-1,mytopic-3 
> (kafka.controller.TopicDeletionManager)
> [2018-01-09 11:19:06,237] INFO [Controller id=6] New leader and ISR for 
> partition mytopic-0 is {"leader":-1,"leader_epoch":1,"isr":[]} 
> (kafka.controller.KafkaController)
> [2018-01-09 11:19:06,412] INFO [Topic Deletion Manager 6], Deletion for 
> replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
> topic mytopic in progress (kafka.controller.TopicDeletionManager)
> [2018-01-09 11:19:07,218] INFO [Topic Deletion Manager 6], Deletion for 
> replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
> topic mytopic in progress (kafka.controller.TopicDeletionManager)
> [2018-01-09 11:19:07,304] INFO [Topic Deletion Manager 6], Deletion for 
> replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
> topic mytopic in progress (kafka.controller.TopicDeletionManager)
> [2018-01-09 11:19:07,383] INFO [Topic Deletion Manager 6], Deletion for 
> replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
> topic mytopic in progress (kafka.controller.TopicDeletionManager)
> [2018-01-09 11:19:07,510] INFO [Topic Deletion Manager 6], Deletion for 
> replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
> topic mytopic in progress (kafka.controller.TopicDeletionManager)
> [2018-01-09 11:19:07,661] INFO [Topic Deletion Manager 6], Deletion for 
> replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
> topic mytopic in progress (kafka.controller.TopicDeletionManager)
> [2018-01-09 11:19:07,728] INFO [Topic Deletion Manager 6], Deletion for 
> replicas 9,10,11 for partition mytopic-0,mytopic-1,mytopic-2 of topic mytopic 
> in progress (kafka.controller.TopicDeletionManager)
> [2018-01-09 11:19:07,924] INFO [PartitionStateMachine controllerId=6] 
> Invoking state change to OfflinePartition for partitions 
> mytopic-2,mytopic-0,mytopic-1,mytopic-3 
> (kafka.controller.PartitionStateMachine)
> [2018-01-09 11:19:07,924] INFO [PartitionStateMachine controllerId=6] 
> Invoking state change to NonExistentPartition for partitions 
> mytopic-2,mytopic-0,mytopic-1,mytopic-3 
> (kafka.controller.PartitionStateMachine)
> [2018-01-09 11:19:08,592] INFO [Controller id=6] New topics: [Set(mytopic, 
> other, other2)], deleted topics: [Set()], new partition replica assignment 
> [Map(other-0 -> Vector(8), mytopic-2 -> Vector(6), mytopic-0 -> Vector(4), 
> other-2 -> Vector(10), mytopic-1 -> Vector(5), mytopic-3 -> Vector(7), 
> other-1 -> Vector(9), other-3 -> Vector(11))] 
> (kafka.controller.KafkaController)
> [2018-01-09 11:19:08,593] INFO [Controller id=6] New topic creation callback 
> for other-0,mytopic-2,mytopic-0,other-2,mytopic-1,mytopic-3,other-1,other-3 
> (kafka.controller.KafkaController)
> [2018-01-09 11:19:08,596] INFO [Controller id=6] New partition creation 
> callback for 
> other-0,mytopic-2,mytopic-0,other-2,mytopic-1,mytopic-3,other-1,other-3 
> (kafka.controller.KafkaController)
> [2018-01-09 11:19:08,596] INFO [PartitionStateMachine controllerId=6] 
> Invoking state change to NewPartition for partitions 
> other-0,mytopic-2,mytopic-0,other-2,mytopic-1,mytopic-3,other-1,other-3 
> (kafka.controller.PartitionStateMachine)
> [2018-01-09 11:19:08,642] INFO [PartitionStateMachine controllerId=6] 
> Invoking state change to OnlinePartition for partitions 
> 

[jira] [Updated] (KAFKA-6438) NSEE while concurrently creating and deleting a topic

2018-01-10 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-6438:
---
Labels: reliability  (was: )

> NSEE while concurrently creating and deleting a topic
> -
>
> Key: KAFKA-6438
> URL: https://issues.apache.org/jira/browse/KAFKA-6438
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 1.0.0
> Environment: kafka_2.11-1.0.0.jar
> OpenJDK Runtime Environment (build 1.8.0_102-b14), OpenJDK 64-Bit Server VM 
> (build 25.102-b14, mixed mode)
> CentOS Linux release 7.3.1611 (Core)
>Reporter: Adam Kotwasinski
>  Labels: reliability
> Fix For: 1.1.0
>
>
> It appears that deleting a topic and creating it at the same time can cause 
> NSEE, what later results in a forced controller shutdown.
> Most probably topics are being created because consumers/producers are still 
> active (yes, this means the deletion is happening blindly).
> The main problem here (for me) is the controller switch, the data loss and 
> following unclean election is acceptable (as we admit to deleting blindly).
> Environment description:
> 20 kafka brokers
> 80k partitions (20k topics 4partitions each)
> 3 node ZK
> Incident:
> {code:java}
> [2018-01-09 11:19:05,912] INFO [Topic Deletion Manager 6], Partition deletion 
> callback for mytopic-2,mytopic-0,mytopic-1,mytopic-3 
> (kafka.controller.TopicDeletionManager)
> [2018-01-09 11:19:06,237] INFO [Controller id=6] New leader and ISR for 
> partition mytopic-0 is {"leader":-1,"leader_epoch":1,"isr":[]} 
> (kafka.controller.KafkaController)
> [2018-01-09 11:19:06,412] INFO [Topic Deletion Manager 6], Deletion for 
> replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
> topic mytopic in progress (kafka.controller.TopicDeletionManager)
> [2018-01-09 11:19:07,218] INFO [Topic Deletion Manager 6], Deletion for 
> replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
> topic mytopic in progress (kafka.controller.TopicDeletionManager)
> [2018-01-09 11:19:07,304] INFO [Topic Deletion Manager 6], Deletion for 
> replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
> topic mytopic in progress (kafka.controller.TopicDeletionManager)
> [2018-01-09 11:19:07,383] INFO [Topic Deletion Manager 6], Deletion for 
> replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
> topic mytopic in progress (kafka.controller.TopicDeletionManager)
> [2018-01-09 11:19:07,510] INFO [Topic Deletion Manager 6], Deletion for 
> replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
> topic mytopic in progress (kafka.controller.TopicDeletionManager)
> [2018-01-09 11:19:07,661] INFO [Topic Deletion Manager 6], Deletion for 
> replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
> topic mytopic in progress (kafka.controller.TopicDeletionManager)
> [2018-01-09 11:19:07,728] INFO [Topic Deletion Manager 6], Deletion for 
> replicas 9,10,11 for partition mytopic-0,mytopic-1,mytopic-2 of topic mytopic 
> in progress (kafka.controller.TopicDeletionManager)
> [2018-01-09 11:19:07,924] INFO [PartitionStateMachine controllerId=6] 
> Invoking state change to OfflinePartition for partitions 
> mytopic-2,mytopic-0,mytopic-1,mytopic-3 
> (kafka.controller.PartitionStateMachine)
> [2018-01-09 11:19:07,924] INFO [PartitionStateMachine controllerId=6] 
> Invoking state change to NonExistentPartition for partitions 
> mytopic-2,mytopic-0,mytopic-1,mytopic-3 
> (kafka.controller.PartitionStateMachine)
> [2018-01-09 11:19:08,592] INFO [Controller id=6] New topics: [Set(mytopic, 
> other, other2)], deleted topics: [Set()], new partition replica assignment 
> [Map(other-0 -> Vector(8), mytopic-2 -> Vector(6), mytopic-0 -> Vector(4), 
> other-2 -> Vector(10), mytopic-1 -> Vector(5), mytopic-3 -> Vector(7), 
> other-1 -> Vector(9), other-3 -> Vector(11))] 
> (kafka.controller.KafkaController)
> [2018-01-09 11:19:08,593] INFO [Controller id=6] New topic creation callback 
> for other-0,mytopic-2,mytopic-0,other-2,mytopic-1,mytopic-3,other-1,other-3 
> (kafka.controller.KafkaController)
> [2018-01-09 11:19:08,596] INFO [Controller id=6] New partition creation 
> callback for 
> other-0,mytopic-2,mytopic-0,other-2,mytopic-1,mytopic-3,other-1,other-3 
> (kafka.controller.KafkaController)
> [2018-01-09 11:19:08,596] INFO [PartitionStateMachine controllerId=6] 
> Invoking state change to NewPartition for partitions 
> other-0,mytopic-2,mytopic-0,other-2,mytopic-1,mytopic-3,other-1,other-3 
> (kafka.controller.PartitionStateMachine)
> [2018-01-09 11:19:08,642] INFO [PartitionStateMachine controllerId=6] 
> Invoking state change to OnlinePartition for partitions 
> 

[jira] [Updated] (KAFKA-6438) NSEE while concurrently creating and deleting a topic

2018-01-10 Thread Adam Kotwasinski (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adam Kotwasinski updated KAFKA-6438:

Description: 
It appears that deleting a topic and creating it at the same time can cause 
NSEE, what later results in a forced controller shutdown.

Most probably topics are being created because consumers/producers are still 
active (yes, this means the deletion is happening blindly).

The main problem here (for me) is the controller switch, the data loss and 
following unclean election is acceptable (as we admit to deleting blindly).

Environment description:
20 kafka brokers
80k partitions (20k topics 4partitions each)
3 node ZK

Incident:
{code:java}
[2018-01-09 11:19:05,912] INFO [Topic Deletion Manager 6], Partition deletion 
callback for mytopic-2,mytopic-0,mytopic-1,mytopic-3 
(kafka.controller.TopicDeletionManager)
[2018-01-09 11:19:06,237] INFO [Controller id=6] New leader and ISR for 
partition mytopic-0 is {"leader":-1,"leader_epoch":1,"isr":[]} 
(kafka.controller.KafkaController)
[2018-01-09 11:19:06,412] INFO [Topic Deletion Manager 6], Deletion for 
replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
topic mytopic in progress (kafka.controller.TopicDeletionManager)
[2018-01-09 11:19:07,218] INFO [Topic Deletion Manager 6], Deletion for 
replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
topic mytopic in progress (kafka.controller.TopicDeletionManager)
[2018-01-09 11:19:07,304] INFO [Topic Deletion Manager 6], Deletion for 
replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
topic mytopic in progress (kafka.controller.TopicDeletionManager)
[2018-01-09 11:19:07,383] INFO [Topic Deletion Manager 6], Deletion for 
replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
topic mytopic in progress (kafka.controller.TopicDeletionManager)
[2018-01-09 11:19:07,510] INFO [Topic Deletion Manager 6], Deletion for 
replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
topic mytopic in progress (kafka.controller.TopicDeletionManager)
[2018-01-09 11:19:07,661] INFO [Topic Deletion Manager 6], Deletion for 
replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of 
topic mytopic in progress (kafka.controller.TopicDeletionManager)
[2018-01-09 11:19:07,728] INFO [Topic Deletion Manager 6], Deletion for 
replicas 9,10,11 for partition mytopic-0,mytopic-1,mytopic-2 of topic mytopic 
in progress (kafka.controller.TopicDeletionManager)
[2018-01-09 11:19:07,924] INFO [PartitionStateMachine controllerId=6] Invoking 
state change to OfflinePartition for partitions 
mytopic-2,mytopic-0,mytopic-1,mytopic-3 (kafka.controller.PartitionStateMachine)
[2018-01-09 11:19:07,924] INFO [PartitionStateMachine controllerId=6] Invoking 
state change to NonExistentPartition for partitions 
mytopic-2,mytopic-0,mytopic-1,mytopic-3 (kafka.controller.PartitionStateMachine)
[2018-01-09 11:19:08,592] INFO [Controller id=6] New topics: [Set(mytopic, 
other, other2)], deleted topics: [Set()], new partition replica assignment 
[Map(other-0 -> Vector(8), mytopic-2 -> Vector(6), mytopic-0 -> Vector(4), 
other-2 -> Vector(10), mytopic-1 -> Vector(5), mytopic-3 -> Vector(7), other-1 
-> Vector(9), other-3 -> Vector(11))] (kafka.controller.KafkaController)
[2018-01-09 11:19:08,593] INFO [Controller id=6] New topic creation callback 
for other-0,mytopic-2,mytopic-0,other-2,mytopic-1,mytopic-3,other-1,other-3 
(kafka.controller.KafkaController)
[2018-01-09 11:19:08,596] INFO [Controller id=6] New partition creation 
callback for 
other-0,mytopic-2,mytopic-0,other-2,mytopic-1,mytopic-3,other-1,other-3 
(kafka.controller.KafkaController)
[2018-01-09 11:19:08,596] INFO [PartitionStateMachine controllerId=6] Invoking 
state change to NewPartition for partitions 
other-0,mytopic-2,mytopic-0,other-2,mytopic-1,mytopic-3,other-1,other-3 
(kafka.controller.PartitionStateMachine)
[2018-01-09 11:19:08,642] INFO [PartitionStateMachine controllerId=6] Invoking 
state change to OnlinePartition for partitions 
other-0,mytopic-2,mytopic-0,other-2,mytopic-1,mytopic-3,other-1,other-3 
(kafka.controller.PartitionStateMachine)
[2018-01-09 11:19:08,828] INFO [Topic Deletion Manager 6], Partition deletion 
callback for mytopic-2,mytopic-0,mytopic-1,mytopic-3 
(kafka.controller.TopicDeletionManager)
[2018-01-09 11:19:09,127] INFO [Controller id=6] New leader and ISR for 
partition mytopic-0 is {"leader":-1,"leader_epoch":1,"isr":[]} 
(kafka.controller.KafkaController)
[2018-01-09 11:19:09,607] ERROR [controller-event-thread]: Error processing 
event TopicDeletion(Set(mytopic, other)) (kafka.controller.Contr
ollerEventManager$ControllerEventThread)
java.util.NoSuchElementException: key not found: mytopic-0
at scala.collection.MapLike$class.default(MapLike.scala:228)
at 

[jira] [Commented] (KAFKA-6437) Streams does not warn about missing input topics, but hangs

2018-01-10 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16320641#comment-16320641
 ] 

Matthias J. Sax commented on KAFKA-6437:


I cannot follow. What do you mean by "it handgs in the middle" ?

Also note, that the behavior you describe is "by design" because the used 
consumer works this way. It's also well documented that you need to create all 
input topics before you start your application. We can of course do more 
logging, but this does not really "solve" the problem...

> Streams does not warn about missing input topics, but hangs
> ---
>
> Key: KAFKA-6437
> URL: https://issues.apache.org/jira/browse/KAFKA-6437
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.0
> Environment: Single client on single node broker
>Reporter: Chris Schwarzfischer
>Priority: Minor
>
> *Case*
> Streams application with two input topics being used for a left join.
> When the left side topic is missing upon starting the streams application, it 
> hangs "in the middle" of the topology (at …9, see below). Only parts of 
> the intermediate topics are created (up to …9)
> When the missing input topic is created, the streams application resumes 
> processing.
> {noformat}
> Topology:
> StreamsTask taskId: 2_0
>   ProcessorTopology:
>   KSTREAM-SOURCE-11:
>   topics: 
> [mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition]
>   children:   [KTABLE-AGGREGATE-12]
>   KTABLE-AGGREGATE-12:
>   states: 
> [KTABLE-AGGREGATE-STATE-STORE-09]
>   children:   [KTABLE-TOSTREAM-20]
>   KTABLE-TOSTREAM-20:
>   children:   [KSTREAM-SINK-21]
>   KSTREAM-SINK-21:
>   topic:  data_udr_month_customer_aggregration
>   KSTREAM-SOURCE-17:
>   topics: 
> [mystreams_app-KSTREAM-MAP-14-repartition]
>   children:   [KSTREAM-LEFTJOIN-18]
>   KSTREAM-LEFTJOIN-18:
>   states: 
> [KTABLE-AGGREGATE-STATE-STORE-09]
>   children:   [KSTREAM-SINK-19]
>   KSTREAM-SINK-19:
>   topic:  data_UDR_joined
> Partitions [mystreams_app-KSTREAM-MAP-14-repartition-0, 
> mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition-0]
> {noformat}
> *Why this matters*
> The applications does quite a lot of preprocessing before joining with the 
> missing input topic. This preprocessing won't happen without the topic, 
> creating a huge backlog of data.
> *Fix*
> Issue an `warn` or `error` level message at start to inform about the missing 
> topic and it's consequences.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-3625) Move kafka-streams test fixtures into a published package

2018-01-10 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16320631#comment-16320631
 ] 

Matthias J. Sax commented on KAFKA-3625:


Thanks a lot for this feedback! This is super helpful!

The artifact you are using atm, it not public API, and thus, there is no 
guarantee that your tests don't break if you upgrade. (Additionally, you pull 
in all Kafka Streams unit tests that you are actually not interested in.) Thus, 
we want to have a public {{streams-test-utils}} package. About serialization -- 
I completely understand that this is annoying, but we cannot easily avoid it... 
But we try to minimize the required boilerplate code. Hope you participate in 
the KIP discussion that I want to start at the mailing list this week.

> Move kafka-streams test fixtures into a published package
> -
>
> Key: KAFKA-3625
> URL: https://issues.apache.org/jira/browse/KAFKA-3625
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jeff Klukas
>Assignee: Matthias J. Sax
>Priority: Minor
>  Labels: needs-kip, user-experience
>
> The KStreamTestDriver and related fixtures defined in 
> streams/src/test/java/org/apache/kafka/test would be useful to developers 
> building applications on top of Kafka Streams, but they are not currently 
> exposed in a package.
> I propose moving this directory to live under streams/fixtures/src/main and 
> creating a new 'streams:fixtures' project in the gradle configuration to 
> publish these as a separate package.
> KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-3625) Move kafka-streams test fixtures into a published package

2018-01-10 Thread Scott Davis (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16320605#comment-16320605
 ] 

Scott Davis edited comment on KAFKA-3625 at 1/10/18 4:45 PM:
-

FYI I just setup some unit tests using the (not documented) internal test 
classes in Kafka Streams, and I thought I'd share some thoughts on my 
experience with it.

First, it's super-helpful! The high-level DSL style of Kafka Streams 
applications doesn't fit well into standard unit testing frameworks, and this 
solves that problem. It's also helpful compared with integration testing in the 
sense that it isn't necessary to produce source messages on a broker, wait for 
timeouts, etc.

The internal test classes in 1.0.0 require the unit tests to provide 
serializers. This can create some extra boilerplate work to configure the 
serializers (especially the Confluent AVRO Serde, which requires a schema 
registry). However, I only need to unit test my application logic (i.e. the 
contents of the "filter" and "map" methods, etc). I can see how testing 
serializers is a requirement for testing Kafka Streams internally, but it isn't 
a requirement for testing the logic of Kafka Streams applications. Note: To 
work around this, I created a "JavaObjectSerde", which uses 
java.io.Object(In|Out)putStream, for use by the unit tests.

I needed about 10 lines of boilerplate code in an @Before method to setup the 
ProcessorTopologyTestDriver, which seemed slightly excessive but not 
burdensome. Most of it was to create the serializers and the StreamsConfig, 
which aren't part of my application logic but were required to create the test 
driver.

I was able to get it to work by adding 
{{org.apache.kafka:kafka-streams:1.0.0:test}} to my build.gradle, so it seems 
there already is an artifact. However, there is no documentation, which seems 
to me like the biggest drawback. I learned it by reading the Kafka Streams 
source code.


was (Author: scott.davis):
FYI I just setup some unit tests using the (not documented) internal test 
classes in Kafka Streams, and I thought I'd share some thoughts on my 
experience with it.

First, it's super-helpful! The high-level DSL style of Kafka Streams 
applications doesn't fit well into standard unit testing frameworks, and this 
solves that problem. It's also helpful compared with integration testing in the 
sense that it isn't necessary to produce source messages on a broker, wait for 
timeouts, etc.

The internal test classes in 1.0.0 require the unit tests to provide 
serializers. This can create some extra boilerplate work to configure the 
serializers (especially the Confluent AVRO Serde, which requires a schema 
registry). However, I only need to unit test my application logic (i.e. the 
contents of the "filter" and "map" methods, etc). I can see how testing 
serializers is a requirement for testing Kafka Streams internally, but it isn't 
a requirement for testing the logic of Kafka Streams applications. Note: To 
work around this, I created a "JavaObjectSerde", which uses 
java.io.Object(In|Out)putStream, for use by the unit tests.

I needed about 10 lines of boilerplate code in an @Before method to setup the 
ProcessorTopologyTestDriver, which seemed slightly excessive but not 
burdensome. Most of it was to create the serializers and the StreamsConfig, 
which aren't part of my application logic but were required to create the test 
driver.


> Move kafka-streams test fixtures into a published package
> -
>
> Key: KAFKA-3625
> URL: https://issues.apache.org/jira/browse/KAFKA-3625
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jeff Klukas
>Assignee: Matthias J. Sax
>Priority: Minor
>  Labels: needs-kip, user-experience
>
> The KStreamTestDriver and related fixtures defined in 
> streams/src/test/java/org/apache/kafka/test would be useful to developers 
> building applications on top of Kafka Streams, but they are not currently 
> exposed in a package.
> I propose moving this directory to live under streams/fixtures/src/main and 
> creating a new 'streams:fixtures' project in the gradle configuration to 
> publish these as a separate package.
> KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-3625) Move kafka-streams test fixtures into a published package

2018-01-10 Thread Scott Davis (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16320605#comment-16320605
 ] 

Scott Davis commented on KAFKA-3625:


FYI I just setup some unit tests using the (not documented) internal test 
classes in Kafka Streams, and I thought I'd share some thoughts on my 
experience with it.

First, it's super-helpful! The high-level DSL style of Kafka Streams 
applications doesn't fit well into standard unit testing frameworks, and this 
solves that problem. It's also helpful compared with integration testing in the 
sense that it isn't necessary to produce source messages on a broker, wait for 
timeouts, etc.

The internal test classes in 1.0.0 require the unit tests to provide 
serializers. This can create some extra boilerplate work to configure the 
serializers (especially the Confluent AVRO Serde, which requires a schema 
registry). However, I only need to unit test my application logic (i.e. the 
contents of the "filter" and "map" methods, etc). I can see how testing 
serializers is a requirement for testing Kafka Streams internally, but it isn't 
a requirement for testing the logic of Kafka Streams applications. Note: To 
work around this, I created a "JavaObjectSerde", which uses 
java.io.Object(In|Out)putStream, for use by the unit tests.

I needed about 10 lines of boilerplate code in an @Before method to setup the 
ProcessorTopologyTestDriver, which seemed slightly excessive but not 
burdensome. Most of it was to create the serializers and the StreamsConfig, 
which aren't part of my application logic but were required to create the test 
driver.


> Move kafka-streams test fixtures into a published package
> -
>
> Key: KAFKA-3625
> URL: https://issues.apache.org/jira/browse/KAFKA-3625
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jeff Klukas
>Assignee: Matthias J. Sax
>Priority: Minor
>  Labels: needs-kip, user-experience
>
> The KStreamTestDriver and related fixtures defined in 
> streams/src/test/java/org/apache/kafka/test would be useful to developers 
> building applications on top of Kafka Streams, but they are not currently 
> exposed in a package.
> I propose moving this directory to live under streams/fixtures/src/main and 
> creating a new 'streams:fixtures' project in the gradle configuration to 
> publish these as a separate package.
> KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6439) "com.streamsets.pipeline.api.StageException: KAFKA_50 - Error writing data to the Kafka broker: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.Ne

2018-01-10 Thread srithar durairaj (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

srithar durairaj updated KAFKA-6439:

Affects Version/s: 0.10.0.1
  Environment: Ubuntu 64bit
  Description: We are using streamset to produce data into kafka topic 
(3 node cluster). We are facing following error frequently in production.  
"com.streamsets.pipeline.api.StageException: KAFKA_50 - Error writing data to 
the Kafka broker: java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.NetworkException: The server disconnected before 
a response was received"
  Component/s: network
  Summary: "com.streamsets.pipeline.api.StageException: KAFKA_50 - 
Error writing data to the Kafka broker: 
java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.NetworkException: The server disconnected before 
a response was received"  (was: We are using streamset to produce data into 
kafka topic (3 node cluster). We are facing following error frequently in 
production.  )

> "com.streamsets.pipeline.api.StageException: KAFKA_50 - Error writing data to 
> the Kafka broker: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.NetworkException: The server disconnected 
> before a response was received"
> -
>
> Key: KAFKA-6439
> URL: https://issues.apache.org/jira/browse/KAFKA-6439
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 0.10.0.1
> Environment: Ubuntu 64bit
>Reporter: srithar durairaj
>
> We are using streamset to produce data into kafka topic (3 node cluster). We 
> are facing following error frequently in production.  
> "com.streamsets.pipeline.api.StageException: KAFKA_50 - Error writing data to 
> the Kafka broker: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.NetworkException: The server disconnected 
> before a response was received"



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6439) We are using streamset to produce data into kafka topic (3 node cluster). We are facing following error frequently in production.

2018-01-10 Thread srithar durairaj (JIRA)
srithar durairaj created KAFKA-6439:
---

 Summary: We are using streamset to produce data into kafka topic 
(3 node cluster). We are facing following error frequently in production.  
 Key: KAFKA-6439
 URL: https://issues.apache.org/jira/browse/KAFKA-6439
 Project: Kafka
  Issue Type: Bug
Reporter: srithar durairaj






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6437) Streams does not warn about missing input topics, but hangs

2018-01-10 Thread Bill Bejeck (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16320487#comment-16320487
 ] 

Bill Bejeck commented on KAFKA-6437:


[~k1th] thanks for reporting.

> Streams does not warn about missing input topics, but hangs
> ---
>
> Key: KAFKA-6437
> URL: https://issues.apache.org/jira/browse/KAFKA-6437
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.0
> Environment: Single client on single node broker
>Reporter: Chris Schwarzfischer
>Priority: Minor
>
> *Case*
> Streams application with two input topics being used for a left join.
> When the left side topic is missing upon starting the streams application, it 
> hangs "in the middle" of the topology (at …9, see below). Only parts of 
> the intermediate topics are created (up to …9)
> When the missing input topic is created, the streams application resumes 
> processing.
> {noformat}
> Topology:
> StreamsTask taskId: 2_0
>   ProcessorTopology:
>   KSTREAM-SOURCE-11:
>   topics: 
> [mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition]
>   children:   [KTABLE-AGGREGATE-12]
>   KTABLE-AGGREGATE-12:
>   states: 
> [KTABLE-AGGREGATE-STATE-STORE-09]
>   children:   [KTABLE-TOSTREAM-20]
>   KTABLE-TOSTREAM-20:
>   children:   [KSTREAM-SINK-21]
>   KSTREAM-SINK-21:
>   topic:  data_udr_month_customer_aggregration
>   KSTREAM-SOURCE-17:
>   topics: 
> [mystreams_app-KSTREAM-MAP-14-repartition]
>   children:   [KSTREAM-LEFTJOIN-18]
>   KSTREAM-LEFTJOIN-18:
>   states: 
> [KTABLE-AGGREGATE-STATE-STORE-09]
>   children:   [KSTREAM-SINK-19]
>   KSTREAM-SINK-19:
>   topic:  data_UDR_joined
> Partitions [mystreams_app-KSTREAM-MAP-14-repartition-0, 
> mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition-0]
> {noformat}
> *Why this matters*
> The applications does quite a lot of preprocessing before joining with the 
> missing input topic. This preprocessing won't happen without the topic, 
> creating a huge backlog of data.
> *Fix*
> Issue an `warn` or `error` level message at start to inform about the missing 
> topic and it's consequences.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6437) Streams does not warn about missing input topics, but hangs

2018-01-10 Thread Chris Schwarzfischer (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Schwarzfischer updated KAFKA-6437:

Issue Type: Improvement  (was: Bug)

> Streams does not warn about missing input topics, but hangs
> ---
>
> Key: KAFKA-6437
> URL: https://issues.apache.org/jira/browse/KAFKA-6437
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.0
> Environment: Single client on single node broker
>Reporter: Chris Schwarzfischer
>Priority: Minor
>
> *Case*
> Streams application with two input topics being used for a left join.
> When the left side topic is missing upon starting the streams application, it 
> hangs "in the middle" of the topology (at …9, see below). Only parts of 
> the intermediate topics are created (up to …9)
> When the missing input topic is created, the streams application resumes 
> processing.
> {noformat}
> Topology:
> StreamsTask taskId: 2_0
>   ProcessorTopology:
>   KSTREAM-SOURCE-11:
>   topics: 
> [mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition]
>   children:   [KTABLE-AGGREGATE-12]
>   KTABLE-AGGREGATE-12:
>   states: 
> [KTABLE-AGGREGATE-STATE-STORE-09]
>   children:   [KTABLE-TOSTREAM-20]
>   KTABLE-TOSTREAM-20:
>   children:   [KSTREAM-SINK-21]
>   KSTREAM-SINK-21:
>   topic:  data_udr_month_customer_aggregration
>   KSTREAM-SOURCE-17:
>   topics: 
> [mystreams_app-KSTREAM-MAP-14-repartition]
>   children:   [KSTREAM-LEFTJOIN-18]
>   KSTREAM-LEFTJOIN-18:
>   states: 
> [KTABLE-AGGREGATE-STATE-STORE-09]
>   children:   [KSTREAM-SINK-19]
>   KSTREAM-SINK-19:
>   topic:  data_UDR_joined
> Partitions [mystreams_app-KSTREAM-MAP-14-repartition-0, 
> mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition-0]
> {noformat}
> *Why this matters*
> The applications does quite a lot of preprocessing before joining with the 
> missing input topic. This preprocessing won't happen without the topic, 
> creating a huge backlog of data.
> *Fix*
> Issue an `warn` or `error` level message at start to inform about the missing 
> topic and it's consequences.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6437) Streams does not warn about missing input topics, but hangs

2018-01-10 Thread Chris Schwarzfischer (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Schwarzfischer updated KAFKA-6437:

Description: 
*Case*
Streams application with two input topics being used for a left join.
When the left side topic is missing upon starting the streams application, it 
hangs "in the middle" of the topology (at …9, see below). Only parts of the 
intermediate topics are created (up to …9)
When the missing input topic is created, the streams application resumes 
processing.

{noformat}
Topology:
StreamsTask taskId: 2_0
ProcessorTopology:
KSTREAM-SOURCE-11:
topics: 
[mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition]
children:   [KTABLE-AGGREGATE-12]
KTABLE-AGGREGATE-12:
states: 
[KTABLE-AGGREGATE-STATE-STORE-09]
children:   [KTABLE-TOSTREAM-20]
KTABLE-TOSTREAM-20:
children:   [KSTREAM-SINK-21]
KSTREAM-SINK-21:
topic:  data_udr_month_customer_aggregration
KSTREAM-SOURCE-17:
topics: 
[mystreams_app-KSTREAM-MAP-14-repartition]
children:   [KSTREAM-LEFTJOIN-18]
KSTREAM-LEFTJOIN-18:
states: 
[KTABLE-AGGREGATE-STATE-STORE-09]
children:   [KSTREAM-SINK-19]
KSTREAM-SINK-19:
topic:  data_UDR_joined
Partitions [mystreams_app-KSTREAM-MAP-14-repartition-0, 
mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition-0]
{noformat}

*Why this matters*
The applications does quite a lot of preprocessing before joining with the 
missing input topic. This preprocessing won't happen without the topic, 
creating a huge backlog of data.

*Fix*
Issue an `warn` or `error` level message at start to inform about the missing 
topic and it's consequences.

  was:
*Case*
Streams application with two input topics being used for a left join.
When the left side topic is missing upon starting the streams application, it 
hangs "in the middle" of the topology (at …9, see below). Only parts of the 
intermediate topics are created (up to …9)
When the missing input topic is created, the streams application resumes 
processing.

{noformat}
Topology:
StreamsTask taskId: 2_0
ProcessorTopology:
KSTREAM-SOURCE-11:
topics: 
[mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition]
children:   [KTABLE-AGGREGATE-12]
KTABLE-AGGREGATE-12:
states: 
[KTABLE-AGGREGATE-STATE-STORE-09]
children:   [KTABLE-TOSTREAM-20]
KTABLE-TOSTREAM-20:
children:   [KSTREAM-SINK-21]
KSTREAM-SINK-21:
topic:  faxout_udr_month_customer_aggregration
KSTREAM-SOURCE-17:
topics: 
[mystreams_app-KSTREAM-MAP-14-repartition]
children:   [KSTREAM-LEFTJOIN-18]
KSTREAM-LEFTJOIN-18:
states: 
[KTABLE-AGGREGATE-STATE-STORE-09]
children:   [KSTREAM-SINK-19]
KSTREAM-SINK-19:
topic:  data_UDR_joined
Partitions [mystreams_app-KSTREAM-MAP-14-repartition-0, 
mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition-0]
{noformat}

*Why this matters*
The applications does quite a lot of preprocessing before joining with the 
missing input topic. This preprocessing won't happen without the topic, 
creating a huge backlog of data.

*Fix*
Issue an `warn` or `error` level message at start to inform about the missing 
topic and it's consequences.


> Streams does not warn about missing input topics, but hangs
> ---
>
> Key: KAFKA-6437
> URL: https://issues.apache.org/jira/browse/KAFKA-6437
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
> Environment: Single client on single node broker
>Reporter: Chris Schwarzfischer
>Priority: Minor
>
> *Case*
> Streams application with two input topics being used for a left join.
> When the left side topic is missing upon starting the streams application, it 
> hangs "in the 

[jira] [Resolved] (KAFKA-2331) Kafka does not spread partitions in a topic among all consumers evenly

2018-01-10 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-2331.
--
Resolution: Auto Closed

Closing inactive issue. The old consumer is no longer supported, please upgrade 
to the Java consumer whenever possible.

> Kafka does not spread partitions in a topic among all consumers evenly
> --
>
> Key: KAFKA-2331
> URL: https://issues.apache.org/jira/browse/KAFKA-2331
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 0.8.1.1
>Reporter: Stefan Miklosovic
>
> I want to have 1 topic with 10 partitions. I am using default configuration 
> of Kafka. I create 1 topic with 10 partitions by that helper script and now I 
> am about to produce messages to it.
> The thing is that even all partitions are indeed consumed, some consumers 
> have more then 1 partition assigned even I have number of consumer threads 
> equal to partitions in a topic hence some threads are idle.
> Let's describe it in more detail.
> I know that common stuff that you need one consumer thread per partition. I 
> want to be able to commit offsets per partition and this is possible only 
> when I have 1 thread per consumer connector per partition (I am using high 
> level consumer).
> So I create 10 threads, in each thread I am calling 
> Consumer.createJavaConsumerConnector() where I am doing this
> topicCountMap.put("mytopic", 1);
> and in the end I have 1 iterator which consumes messages from 1 partition.
> When I do this 10 times, I have 10 consumers, consumer per thread per 
> partition where I can commit offsets independently per partition because if I 
> put different number from 1 in topic map, I would end up with more then 1 
> consumer thread for that topic for given consumer instance so if I am about 
> to commit offsets with created consumer instance, it would commit them for 
> all threads which is not desired.
> But the thing is that when I use consumers, only 7 consumers are involved and 
> it seems that other consumer threads are idle but I do not know why.
> The thing is that I am creating these consumer threads in a loop. So I start 
> first thread (submit to executor service), then another, then another and so 
> on.
> So the scenario is that first consumer gets all 10 partitions, then 2nd 
> connects so it is splits between these two to 5 and 5 (or something similar), 
> then other threads are connecting.
> I understand this as a partition rebalancing among all consumers so it 
> behaves well in such sense that if more consumers are being created, 
> partition rebalancing occurs between these consumers so every consumer should 
> have some partitions to operate upon.
> But from the results I see that there is only 7 consumers and according to 
> consumed messages it seems they are split like 3,2,1,1,1,1,1 partition-wise. 
> Yes, these 7 consumers covered all 10 partitions, but why consumers with more 
> then 1 partition do no split and give partitions to remaining 3 consumers?
> I am pretty much wondering what is happening with remaining 3 threads and why 
> they do not "grab" partitions from consumers which have more then 1 partition 
> assigned.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-2329) Consumers balance fails when multiple consumers are started simultaneously.

2018-01-10 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-2329.
--
Resolution: Auto Closed

Closing inactive issue. The old consumer is no longer supported, please upgrade 
to the Java consumer whenever possible.

> Consumers balance fails when multiple consumers are started simultaneously.
> ---
>
> Key: KAFKA-2329
> URL: https://issues.apache.org/jira/browse/KAFKA-2329
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.1.1, 0.8.2.1
>Reporter: Ze'ev Eli Klapow
>Assignee: Ze'ev Eli Klapow
>  Labels: consumer, patch
> Attachments: zookeeper-consumer-connector-epoch-node.patch
>
>
> During consumer startup a race condition can occur if multiple consumers are 
> started (nearly) simultaneously. 
> If a second consumer is started while the first consumer is in the middle of 
> {{zkClient.subscribeChildChanges}} the first consumer will never see the 
> registration of the second consumer, because the consumer registration node 
> for the second consumer will be unwatched, and no new child will be 
> registered later. This causes the first consumer to own all partitions, and 
> then never release ownership causing the second consumer to fail rebalancing.
> The attached patch solves this by using an "epoch" node which all consumers 
> watch and update to trigger  a rebalance. When a rebalance is triggered we 
> check the consumer registrations against a cached state, to avoid unnecessary 
> rebalances. For safety, we also periodically check the consumer registrations 
> and rebalance. We have been using this patch in production at HubSpot for a 
> while and it has eliminated all rebalance issues.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-2025) In multi-consumer setup - explicit commit, commits on all partitions

2018-01-10 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-2025.
--
Resolution: Auto Closed

Closing inactive issue. The old consumer is no longer supported, please upgrade 
to the Java consumer whenever possible.

> In multi-consumer setup - explicit commit, commits on all partitions
> 
>
> Key: KAFKA-2025
> URL: https://issues.apache.org/jira/browse/KAFKA-2025
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.2.0
> Environment: 1. Tested in Windows
> 2. Not tested on Linux
>Reporter: Pradeep G
>Assignee: Neha Narkhede
>Priority: Critical
>
> In a setup where there are two consumers C1 & C2 belonging to consumer group 
> CG, two partitions P1 & P2; with auto-commit disabled.
> An explicit commit on ConsumerConnect commits on all the consumers i.e. a 
> commit called by C1 commits all messages being processed by other consumers 
> too here C2. 
> Ideally C1 should be able to commit only those messages it has consumed and 
> not what is being processed by C2.  The effect of this behavior is that; 
> suppose C2 crashes while processing message M after C1 commits, is that 
> message M being processed by C2 is not available on recovery and is lost 
> forever; and in kafka M is marked as consumed.
> I read that this would be addressed in the rewrite - 
> https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite#ClientRewrite-ConsumerAPI
> Any thoughts on which release this would be addressed ?.  A quick response 
> would be greatly appreciated.
> Thanks,
> Pradeep



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5946) Give connector method parameter better name

2018-01-10 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-5946:
--
Labels: connector newbie usability  (was: connector usability)

> Give connector method parameter better name
> ---
>
> Key: KAFKA-5946
> URL: https://issues.apache.org/jira/browse/KAFKA-5946
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ted Yu
>  Labels: connector, newbie, usability
>
> During the development of KAFKA-5657, there were several iterations where 
> method call didn't match what the connector parameter actually represents.
> [~ewencp] had used connType as equivalent to connClass because Type wasn't 
> used to differentiate source vs sink.
> [~ewencp] proposed the following:
> {code}
> It would help to convert all the uses of connType to connClass first, then 
> standardize on class == java class, type == source/sink, name == 
> user-specified name.
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-1229) Reload broker config without a restart

2018-01-10 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-1229.
--
Resolution: Duplicate

Resolving as duplicate of KIP-226/KAFKA-6240. Pls reopen of any concern.

> Reload broker config without a restart
> --
>
> Key: KAFKA-1229
> URL: https://issues.apache.org/jira/browse/KAFKA-1229
> Project: Kafka
>  Issue Type: Wish
>  Components: config
>Affects Versions: 0.8.0
>Reporter: Carlo Cabanilla
>Priority: Minor
>
> In order to minimize client disruption, ideally you'd be able to reload 
> broker config without having to restart the server. On *nix system the 
> convention is to have the process reread its configuration if it receives a 
> SIGHUP signal.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6412) Improve synchronization in CachingKeyValueStore methods

2018-01-10 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16320047#comment-16320047
 ] 

Ted Yu commented on KAFKA-6412:
---

I came up with the initial idea for this JIRA when sitting in hotel lobby at 
Grand Canyon.

It was nice way to start 2018.

> Improve synchronization in CachingKeyValueStore methods
> ---
>
> Key: KAFKA-6412
> URL: https://issues.apache.org/jira/browse/KAFKA-6412
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ted Yu
> Fix For: 1.1.0
>
> Attachments: 6412-jmh.v1.txt, k-6412.v1.txt
>
>
> Currently CachingKeyValueStore methods are synchronized at method level.
> It seems we can use read lock for getter and write lock for put / delete 
> methods.
> For getInternal(), if the underlying thread is streamThread, the 
> getInternal() may trigger eviction. This can be handled by obtaining write 
> lock at the beginning of the method for streamThread.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6412) Improve synchronization in CachingKeyValueStore methods

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16320013#comment-16320013
 ] 

ASF GitHub Bot commented on KAFKA-6412:
---

dguy closed pull request #4372: KAFKA-6412 Improve synchronization in 
CachingKeyValueStore methods
URL: https://github.com/apache/kafka/pull/4372
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index f0669a4f6ee..9fff8ccca04 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -31,6 +31,9 @@
 
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 class CachingKeyValueStore extends WrappedStateStore.AbstractStateStore 
implements KeyValueStore, CachedStateStore {
 
@@ -44,6 +47,7 @@
 private InternalProcessorContext context;
 private StateSerdes serdes;
 private Thread streamThread;
+private ReadWriteLock lock = new ReentrantReadWriteLock();
 
 CachingKeyValueStore(final KeyValueStore underlying,
  final Serde keySerde,
@@ -108,9 +112,14 @@ public void setFlushListener(final CacheFlushListener flushListener,
 }
 
 @Override
-public synchronized void flush() {
-cache.flush(cacheName);
-underlying.flush();
+public void flush() {
+lock.writeLock().lock();
+try {
+cache.flush(cacheName);
+underlying.flush();
+} finally {
+lock.writeLock().unlock();
+}
 }
 
 @Override
@@ -131,10 +140,21 @@ public boolean isOpen() {
 }
 
 @Override
-public synchronized byte[] get(final Bytes key) {
+public byte[] get(final Bytes key) {
 validateStoreOpen();
-Objects.requireNonNull(key);
-return getInternal(key);
+Lock theLock;
+if (Thread.currentThread().equals(streamThread)) {
+theLock = lock.writeLock();
+} else {
+theLock = lock.readLock();
+}
+theLock.lock();
+try {
+Objects.requireNonNull(key);
+return getInternal(key);
+} finally {
+theLock.unlock();
+}
 }
 
 private byte[] getInternal(final Bytes key) {
@@ -176,50 +196,75 @@ public boolean isOpen() {
 }
 
 @Override
-public synchronized long approximateNumEntries() {
+public long approximateNumEntries() {
 validateStoreOpen();
-return underlying.approximateNumEntries();
+lock.readLock().lock();
+try {
+return underlying.approximateNumEntries();
+} finally {
+lock.readLock().unlock();
+}
 }
 
 @Override
-public synchronized void put(final Bytes key, final byte[] value) {
+public void put(final Bytes key, final byte[] value) {
 Objects.requireNonNull(key, "key cannot be null");
 validateStoreOpen();
-putInternal(key, value);
+lock.writeLock().lock();
+try {
+putInternal(key, value);
+} finally {
+lock.writeLock().unlock();
+}
 }
 
-private synchronized void putInternal(final Bytes rawKey, final byte[] 
value) {
+private void putInternal(final Bytes rawKey, final byte[] value) {
 Objects.requireNonNull(rawKey, "key cannot be null");
 cache.put(cacheName, rawKey, new LRUCacheEntry(value, true, 
context.offset(),
-  context.timestamp(), context.partition(), context.topic()));
+  context.timestamp(), context.partition(), context.topic()));
 }
 
 @Override
-public synchronized byte[] putIfAbsent(final Bytes key, final byte[] 
value) {
+public byte[] putIfAbsent(final Bytes key, final byte[] value) {
 Objects.requireNonNull(key, "key cannot be null");
 validateStoreOpen();
-final byte[] v = getInternal(key);
-if (v == null) {
-putInternal(key, value);
+lock.writeLock().lock();
+try {
+final byte[] v = getInternal(key);
+if (v == null) {
+putInternal(key, value);
+}
+return v;
+} finally {
+lock.writeLock().unlock();
 }
-return v;
 }
 
 @Override
- 

[jira] [Resolved] (KAFKA-6412) Improve synchronization in CachingKeyValueStore methods

2018-01-10 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy resolved KAFKA-6412.
---
   Resolution: Fixed
Fix Version/s: 1.1.0

Issue resolved by pull request 4372
[https://github.com/apache/kafka/pull/4372]

> Improve synchronization in CachingKeyValueStore methods
> ---
>
> Key: KAFKA-6412
> URL: https://issues.apache.org/jira/browse/KAFKA-6412
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ted Yu
> Fix For: 1.1.0
>
> Attachments: 6412-jmh.v1.txt, k-6412.v1.txt
>
>
> Currently CachingKeyValueStore methods are synchronized at method level.
> It seems we can use read lock for getter and write lock for put / delete 
> methods.
> For getInternal(), if the underlying thread is streamThread, the 
> getInternal() may trigger eviction. This can be handled by obtaining write 
> lock at the beginning of the method for streamThread.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4029) SSL support for Connect REST API

2018-01-10 Thread Jakub Scholz (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319960#comment-16319960
 ] 

Jakub Scholz commented on KAFKA-4029:
-

[~rhauch] Yeah, I have something already. If you want I can rebase it and 
create some PR so that you can have a look at it. I'm now busy with something 
else, but I should be able to do it till the end of the week.

> SSL support for Connect REST API
> 
>
> Key: KAFKA-4029
> URL: https://issues.apache.org/jira/browse/KAFKA-4029
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Ewen Cheslack-Postava
>Assignee: Jakub Scholz
>
> Currently the Connect REST API only supports http. We should also add SSL 
> support so access to the REST API can be secured.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-5624) Unsafe use of expired sensors

2018-01-10 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar reassigned KAFKA-5624:


Assignee: Manikumar

> Unsafe use of expired sensors
> -
>
> Key: KAFKA-5624
> URL: https://issues.apache.org/jira/browse/KAFKA-5624
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Manikumar
>
> Seems a couple unhandled cases following sensor expiration:
> 1. Static sensors (such as {{ClientQuotaManager.delayQueueSensor}}) can be 
> expired due to inactivity, but the references will remain valid and usable. 
> Probably a good idea to either ensure we use a "get or create" pattern when 
> accessing the sensor or add a new static registration option which makes the 
> sensor ineligible for expiration.
> 2. It is possible to register metrics through the sensor even after it is 
> expired. We should probably raise an exception instead.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)