[jira] [Commented] (KAFKA-6378) NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper returns null
[ https://issues.apache.org/jira/browse/KAFKA-6378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16321832#comment-16321832 ] Matthias J. Sax commented on KAFKA-6378: Can you open a PR? If yes, please update the JavaDocs accordingly and also add a corresponding test. > NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper > returns null > -- > > Key: KAFKA-6378 > URL: https://issues.apache.org/jira/browse/KAFKA-6378 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Andy Bryant > > On a Stream->GlobalKTable leftJoin if the KeyValueMapper returns null, the > stream fails with a NullPointerException (see stacktrace below). On Kafka > 0.11.0.0 the stream processes this successfully, calling the ValueJoiner with > the table value set to null. > The use-case for this is joining a stream to a table containing reference > data where the stream foreign key may be null. There is no straight-forward > workaround in this case with Kafka 1.0.0 without having to resort to either > generating a key that will never match or branching the stream for records > that don't have the foreign key. > Exception in thread "workshop-simple-example-client-StreamThread-1" > java.lang.NullPointerException > at java.base/java.util.Objects.requireNonNull(Objects.java:221) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:136) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:35) > at > org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:184) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:116) > at > org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.get(KTableSourceValueGetterSupplier.java:49) > at > org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:56) > at > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317) > at > org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Issue Comment Deleted] (KAFKA-6441) FetchRequest populates buffer of size MinBytes, even if response is smaller
[ https://issues.apache.org/jira/browse/KAFKA-6441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Babrou updated KAFKA-6441: --- Comment: was deleted (was: With 0.10.2.0 consumer API Sarama is able to get multiple messages in one FetchResponse. It doesn't seem right to get only one with 0.11.0.0 API.) > FetchRequest populates buffer of size MinBytes, even if response is smaller > --- > > Key: KAFKA-6441 > URL: https://issues.apache.org/jira/browse/KAFKA-6441 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.1 >Reporter: Ivan Babrou > > We're using Sarama Go client as consumer, but I don't think it's relevant. > Producer is syslog-ng with Kafka output, I'm not quite sure which log format > Kafka itself is using, but I can assume 0.11.0.0, because that's what is set > in topic settings. > Our FetchRequest has minSize = 16MB, maxSize = 64, maxWait = 500ms. For a > silly reason, Kafka decides to reply with at least minSize buffer with just > one 1KB log message. When Sarama was using older consumer API, everything was > okay. When we upgraded to 0.11.0.0 consumer API, consumer traffic for > 125Mbit/s topic spiked to 55000Mbit/s on the wire and consumer wasn't even > able to keep up. > 1KB message in a 16MB buffer is 1,600,000% overhead. > I don't think there's any valid reason to do this. > It's also mildly annoying that there is no tag 0.11.0.1 in git, looking at > changes is harder than it should be. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6441) FetchRequest populates buffer of size MinBytes, even if response is smaller
[ https://issues.apache.org/jira/browse/KAFKA-6441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16321715#comment-16321715 ] Ivan Babrou commented on KAFKA-6441: With 0.10.2.0 consumer API Sarama is able to get multiple messages in one FetchResponse. It doesn't seem right to get only one with 0.11.0.0 API. > FetchRequest populates buffer of size MinBytes, even if response is smaller > --- > > Key: KAFKA-6441 > URL: https://issues.apache.org/jira/browse/KAFKA-6441 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.1 >Reporter: Ivan Babrou > > We're using Sarama Go client as consumer, but I don't think it's relevant. > Producer is syslog-ng with Kafka output, I'm not quite sure which log format > Kafka itself is using, but I can assume 0.11.0.0, because that's what is set > in topic settings. > Our FetchRequest has minSize = 16MB, maxSize = 64, maxWait = 500ms. For a > silly reason, Kafka decides to reply with at least minSize buffer with just > one 1KB log message. When Sarama was using older consumer API, everything was > okay. When we upgraded to 0.11.0.0 consumer API, consumer traffic for > 125Mbit/s topic spiked to 55000Mbit/s on the wire and consumer wasn't even > able to keep up. > 1KB message in a 16MB buffer is 1,600,000% overhead. > I don't think there's any valid reason to do this. > It's also mildly annoying that there is no tag 0.11.0.1 in git, looking at > changes is harder than it should be. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6441) FetchRequest populates buffer of size MinBytes, even if response is smaller
[ https://issues.apache.org/jira/browse/KAFKA-6441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16321716#comment-16321716 ] Ivan Babrou commented on KAFKA-6441: With 0.10.2.0 consumer API Sarama is able to get multiple messages in one FetchResponse. It doesn't seem right to get only one with 0.11.0.0 API. > FetchRequest populates buffer of size MinBytes, even if response is smaller > --- > > Key: KAFKA-6441 > URL: https://issues.apache.org/jira/browse/KAFKA-6441 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.1 >Reporter: Ivan Babrou > > We're using Sarama Go client as consumer, but I don't think it's relevant. > Producer is syslog-ng with Kafka output, I'm not quite sure which log format > Kafka itself is using, but I can assume 0.11.0.0, because that's what is set > in topic settings. > Our FetchRequest has minSize = 16MB, maxSize = 64, maxWait = 500ms. For a > silly reason, Kafka decides to reply with at least minSize buffer with just > one 1KB log message. When Sarama was using older consumer API, everything was > okay. When we upgraded to 0.11.0.0 consumer API, consumer traffic for > 125Mbit/s topic spiked to 55000Mbit/s on the wire and consumer wasn't even > able to keep up. > 1KB message in a 16MB buffer is 1,600,000% overhead. > I don't think there's any valid reason to do this. > It's also mildly annoying that there is no tag 0.11.0.1 in git, looking at > changes is harder than it should be. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6441) FetchRequest populates buffer of size MinBytes, even if response is smaller
Ivan Babrou created KAFKA-6441: -- Summary: FetchRequest populates buffer of size MinBytes, even if response is smaller Key: KAFKA-6441 URL: https://issues.apache.org/jira/browse/KAFKA-6441 Project: Kafka Issue Type: Bug Affects Versions: 0.11.0.1 Reporter: Ivan Babrou We're using Sarama Go client as consumer, but I don't think it's relevant. Producer is syslog-ng with Kafka output, I'm not quite sure which log format Kafka itself is using, but I can assume 0.11.0.0, because that's what is set in topic settings. Our FetchRequest has minSize = 16MB, maxSize = 64, maxWait = 500ms. For a silly reason, Kafka decides to reply with at least minSize buffer with just one 1KB log message. When Sarama was using older consumer API, everything was okay. When we upgraded to 0.11.0.0 consumer API, consumer traffic for 125Mbit/s topic spiked to 55000Mbit/s on the wire and consumer wasn't even able to keep up. 1KB message in a 16MB buffer is 1,600,000% overhead. I don't think there's any valid reason to do this. It's also mildly annoying that there is no tag 0.11.0.1 in git, looking at changes is harder than it should be. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6265) GlobalKTable missing #queryableStoreName()
[ https://issues.apache.org/jira/browse/KAFKA-6265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16321568#comment-16321568 ] ASF GitHub Bot commented on KAFKA-6265: --- ConcurrencyPractitioner opened a new pull request #4413: [KAFKA-6265] GlobalKTable missing #queryableStoreName() URL: https://github.com/apache/kafka/pull/4413 A spinoff of original pull request #4340 for resolving conflicts. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > GlobalKTable missing #queryableStoreName() > -- > > Key: KAFKA-6265 > URL: https://issues.apache.org/jira/browse/KAFKA-6265 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Antony Stubbs >Assignee: Richard Yu > Labels: beginner, needs-kip, newbie > Fix For: 1.1.0 > > > KTable has the nicely useful #queryableStoreName(), it seems to be missing > from GlobalKTable -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6265) GlobalKTable missing #queryableStoreName()
[ https://issues.apache.org/jira/browse/KAFKA-6265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16321551#comment-16321551 ] ASF GitHub Bot commented on KAFKA-6265: --- ConcurrencyPractitioner closed pull request #4412: [KAFKA-6265] GlobalKTable missing #queryableStoreName() URL: https://github.com/apache/kafka/pull/4412 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/streams/developer-guide.html b/docs/streams/developer-guide.html index 5730f5330e1..20d01f2c148 100644 --- a/docs/streams/developer-guide.html +++ b/docs/streams/developer-guide.html @@ -2294,22 +2294,22 @@ - // Get the window store named "CountsWindowStore" - ReadOnlyWindowStoreString, Long windowStore = - streams.store("CountsWindowStore", QueryableStoreTypes.windowStore()); - - // Fetch values for the key "world" for all of the windows available in this application instance. - // To get *all* available windows we fetch windows from the beginning of time until now. - long timeFrom = 0; // beginning of time = oldest available - long timeTo = System.currentTimeMillis(); // now (in processing-time) - WindowStoreIteratorLong iterator = windowStore.fetch("world", timeFrom, timeTo); - while (iterator.hasNext()) { + // Get the window store named "CountsWindowStore" + ReadOnlyWindowStoreString, Long windowStore = + streams.store("CountsWindowStore", QueryableStoreTypes.windowStore()); + + // Fetch values for the key "world" for all of the windows available in this application instance. + // To get *all* available windows we fetch windows from the beginning of time until now. + long timeFrom = 0; // beginning of time = oldest available + long timeTo = System.currentTimeMillis(); // now (in processing-time) + WindowStoreIteratorLong iterator = windowStore.fetch("world", timeFrom, timeTo); + while (iterator.hasNext()) { KeyValueLong, Long next = iterator.next(); long windowTimestamp = next.key; System.out.println("Count of 'world' @ time " + windowTimestamp + " is " + next.value); - } - iterator.close(); - + } + iterator.close(); + Querying local custom state stores @@ -3023,4 +3023,4 @@ Executing Your Kafka Streams // Display docs subnav items $('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded'); }); - \ No newline at end of file + diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 297405802ca..cd5ad433159 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -44,6 +44,15 @@ Upgrade Guide API Changes See below a complete list of 0.10.1 API changes that allow you to advance your application and/or simplify your code base, including the usage of new features. + +Streams API changes in 1.1.0 + + New method in GlobalKTable + + +A method has been provided such that it will return the store name associated with the GlobalKTable or null if the store name is non-queryable. + + Streams API changes in 1.0.0 diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java index 72286c20529..e58f67fc5b3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java @@ -67,4 +67,10 @@ */ @InterfaceStability.Evolving public interface GlobalKTable{ +/** + * Get the name of the local state store that can be used to query this {@code GlobalKTable}. + * + * @return the underlying state store name, or {@code null} if this {@code GlobalKTable} cannot be queried. + */ +String queryableStoreName(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GlobalKTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GlobalKTableImpl.java index 34e23752444..8fcdfed1e52 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GlobalKTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GlobalKTableImpl.java @@ -21,13 +21,29 @@ public class GlobalKTableImpl implements GlobalKTable { private final KTableValueGetterSupplier valueGetterSupplier; +private final boolean queryable; public GlobalKTableImpl(final KTableValueGetterSupplier
[jira] [Commented] (KAFKA-6378) NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper returns null
[ https://issues.apache.org/jira/browse/KAFKA-6378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16321393#comment-16321393 ] Ted Yu commented on KAFKA-6378: --- Here is the two line change : {code} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals index bac930d..dd7877b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java @@ -53,7 +53,8 @@ class KStreamKTableJoinProcessorextends AbstractProcessor NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper > returns null > -- > > Key: KAFKA-6378 > URL: https://issues.apache.org/jira/browse/KAFKA-6378 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Andy Bryant > > On a Stream->GlobalKTable leftJoin if the KeyValueMapper returns null, the > stream fails with a NullPointerException (see stacktrace below). On Kafka > 0.11.0.0 the stream processes this successfully, calling the ValueJoiner with > the table value set to null. > The use-case for this is joining a stream to a table containing reference > data where the stream foreign key may be null. There is no straight-forward > workaround in this case with Kafka 1.0.0 without having to resort to either > generating a key that will never match or branching the stream for records > that don't have the foreign key. > Exception in thread "workshop-simple-example-client-StreamThread-1" > java.lang.NullPointerException > at java.base/java.util.Objects.requireNonNull(Objects.java:221) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:136) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:35) > at > org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:184) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:116) > at > org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.get(KTableSourceValueGetterSupplier.java:49) > at > org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:56) > at > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317) > at > org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6378) NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper returns null
[ https://issues.apache.org/jira/browse/KAFKA-6378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16321374#comment-16321374 ] Andy Bryant commented on KAFKA-6378: A sentinel value is ok where only a subset of the available values for a type are valid, but it does seem messy to have to convert {{null}} values to the sentinel before the join then back to {{null}} again in the merge function afterwards. Also it doesn't cater for the case where you can't pick a sentinel because all values of a type are valid. Since as Matthias pointed out {{null}} can never be a valid key explicitly calling it out as indicating no match in the docs and updated the code so it doesn't crash (a two line change by the looks) seems like a nice clean approach to me. > NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper > returns null > -- > > Key: KAFKA-6378 > URL: https://issues.apache.org/jira/browse/KAFKA-6378 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Andy Bryant > > On a Stream->GlobalKTable leftJoin if the KeyValueMapper returns null, the > stream fails with a NullPointerException (see stacktrace below). On Kafka > 0.11.0.0 the stream processes this successfully, calling the ValueJoiner with > the table value set to null. > The use-case for this is joining a stream to a table containing reference > data where the stream foreign key may be null. There is no straight-forward > workaround in this case with Kafka 1.0.0 without having to resort to either > generating a key that will never match or branching the stream for records > that don't have the foreign key. > Exception in thread "workshop-simple-example-client-StreamThread-1" > java.lang.NullPointerException > at java.base/java.util.Objects.requireNonNull(Objects.java:221) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:136) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:35) > at > org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:184) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:116) > at > org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.get(KTableSourceValueGetterSupplier.java:49) > at > org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:56) > at > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317) > at > org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (KAFKA-6205) Have State Stores Restore Before Initializing Toplogy
[ https://issues.apache.org/jira/browse/KAFKA-6205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck reassigned KAFKA-6205: -- Assignee: Bill Bejeck > Have State Stores Restore Before Initializing Toplogy > - > > Key: KAFKA-6205 > URL: https://issues.apache.org/jira/browse/KAFKA-6205 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.0.0, 0.11.0.2 >Reporter: Bill Bejeck >Assignee: Bill Bejeck > Fix For: 1.0.1, 0.11.0.3 > > > Streams should restore state stores (if needed) before initializing the > topology. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (KAFKA-4969) State-store workload-aware StreamsPartitionAssignor
[ https://issues.apache.org/jira/browse/KAFKA-4969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck reassigned KAFKA-4969: -- Assignee: Bill Bejeck > State-store workload-aware StreamsPartitionAssignor > --- > > Key: KAFKA-4969 > URL: https://issues.apache.org/jira/browse/KAFKA-4969 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Matthias J. Sax >Assignee: Bill Bejeck > > Currently, {{StreamPartitionsAssigner}} does not distinguish different > "types" of tasks. For example, task can be stateless of have one or multiple > stores. > This can lead to an suboptimal task placement: assume there are 2 stateless > and 2 stateful tasks and the app is running with 2 instances. To share the > "store load" it would be good to place one stateless and one stateful task > per instance. Right now, there is no guarantee about this, and it can happen, > that one instance processed both stateless tasks while the other processes > both stateful tasks. > We should improve {{StreamPartitionAssignor}} and introduce "task types" > including a cost model for task placement. We should consider the following > parameters: > - number of stores > - number of sources/sinks > - number of processors > - regular task vs standby task > This improvement should be backed by a design document in the project wiki > (no KIP required though) as it's a fairly complex change. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4969) State-store workload-aware StreamsPartitionAssignor
[ https://issues.apache.org/jira/browse/KAFKA-4969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16321183#comment-16321183 ] ASF GitHub Bot commented on KAFKA-4969: --- bbejeck opened a new pull request #4410: KAFKA-4969: attempt to evenly distribute load of tasks URL: https://github.com/apache/kafka/pull/4410 This PR is an initial attempt to evenly distribute tasks with heavy processing across clients using a somewhat naive approach. The rationale is by making sure each task is not comprised entirely of the same `topicGroupId`s, then if there is one sub-topology doing heavy processing and another sub-topology that is relatively light, the processing load is somewhat evenly distributed. This process only looks at active tasks; standby tasks are not given this consideration as we can end up in a state where clients have the same task assignments i.e [aT1, sT2] [aT2, sT1]. We plan to do a follow-on task at a later date where we weigh tasks with state stores to distribute tasks with state stores evenly. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > State-store workload-aware StreamsPartitionAssignor > --- > > Key: KAFKA-4969 > URL: https://issues.apache.org/jira/browse/KAFKA-4969 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Matthias J. Sax > > Currently, {{StreamPartitionsAssigner}} does not distinguish different > "types" of tasks. For example, task can be stateless of have one or multiple > stores. > This can lead to an suboptimal task placement: assume there are 2 stateless > and 2 stateful tasks and the app is running with 2 instances. To share the > "store load" it would be good to place one stateless and one stateful task > per instance. Right now, there is no guarantee about this, and it can happen, > that one instance processed both stateless tasks while the other processes > both stateful tasks. > We should improve {{StreamPartitionAssignor}} and introduce "task types" > including a cost model for task placement. We should consider the following > parameters: > - number of stores > - number of sources/sinks > - number of processors > - regular task vs standby task > This improvement should be backed by a design document in the project wiki > (no KIP required though) as it's a fairly complex change. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4711) Change Default unclean.leader.election.enabled from True to False (KIP-106)
[ https://issues.apache.org/jira/browse/KAFKA-4711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16321146#comment-16321146 ] ASF GitHub Bot commented on KAFKA-4711: --- junrao closed pull request #3567: KAFKA-4711: fix docs onunclean.leader.election.enable default URL: https://github.com/apache/kafka/pull/3567 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/design.html b/docs/design.html index 564df386db7..69d1941effd 100644 --- a/docs/design.html +++ b/docs/design.html @@ -238,8 +238,8 @@ 4.6 Message Delivery Semantics can fail, cases where there are multiple consumer processes, or cases where data written to disk can be lost). Kafka's semantics are straight-forward. When publishing a message we have a notion of the message being "committed" to the log. Once a published message is committed it will not be lost as long as one broker that -replicates the partition to which this message was written remains "alive". The definition of committed message, alive partition as well as a description of which types of failures we attempt to handle will be -described in more detail in the next section. For now let's assume a perfect, lossless broker and try to understand the guarantees to the producer and consumer. If a producer attempts to publish a message and +replicates the partition to which this message was written remains "alive". The definition of committed message, alive partition as well as a description of which types of failures we attempt to handle will be +described in more detail in the next section. For now let's assume a perfect, lossless broker and try to understand the guarantees to the producer and consumer. If a producer attempts to publish a message and experiences a network error it cannot be sure if this error happened before or after the message was committed. This is similar to the semantics of inserting into a database table with an autogenerated key. Prior to 0.11.0.0, if a producer failed to receive a response indicating that a message was committed, it had little choice but to resend the message. This provides at-least-once delivery semantics since the @@ -309,11 +309,11 @@ 4.7 Replication handle so-called "Byzantine" failures in which nodes produce arbitrary or malicious responses (perhaps due to bugs or foul play). We can now more precisely define that a message is considered committed when all in sync replicas for that partition have applied it to their log. -Only committed messages are ever given out to the consumer. This means that the consumer need not worry about potentially seeing a message that could be lost if the leader fails. Producers, on the other hand, -have the option of either waiting for the message to be committed or not, depending on their preference for tradeoff between latency and durability. This preference is controlled by the acks setting that the +Only committed messages are ever given out to the consumer. This means that the consumer need not worry about potentially seeing a message that could be lost if the leader fails. Producers, on the other hand, +have the option of either waiting for the message to be committed or not, depending on their preference for tradeoff between latency and durability. This preference is controlled by the acks setting that the producer uses. Note that topics have a setting for the "minimum number" of in-sync replicas that is checked when the producer requests acknowledgment that a message -has been written to the full set of in-sync replicas. If a less stringent acknowledgement is requested by the producer, then the message can be committed, and consumed, +has been written to the full set of in-sync replicas. If a less stringent acknowledgement is requested by the producer, then the message can be committed, and consumed, even if the number of in-sync replicas is lower than the minimum (e.g. it can be as low as just the leader). The guarantee that Kafka offers is that a committed message will not be lost, as long as there is at least one in sync replica alive, at all times. @@ -384,8 +384,8 @@ Unclean leader ele This is a simple tradeoff between availability and consistency. If we wait for replicas in the ISR, then we will remain unavailable as long as those replicas are down. If such replicas were destroyed or their data was lost, then we are permanently down. If, on the other hand, a non-in-sync replica comes back to life and we allow it to become leader, then its log becomes the source of truth even though it is not guaranteed to -have every
[jira] [Updated] (KAFKA-6440) Expose Connect leader via REST
[ https://issues.apache.org/jira/browse/KAFKA-6440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryan P updated KAFKA-6440: -- Description: [KIP-196|https://cwiki.apache.org/confluence/display/KAFKA/KIP-196%3A+Add+metrics+to+Kafka+Connect+framework] adds a metric to expose the current leader of a connect cluster. It would be helpful to make this information available via REST, along with a list of all the clusters members. (was: [KIP-196|https://cwiki.apache.org/confluence/display/KAFKA/KIP-196%3A+Add+metrics+to+Kafka+Connect+framework] adds a metric to expose the current leader of a connect cluster. It would be helpful to make this information available via REST as well as it would not require the use of a JMX client. In ad) > Expose Connect leader via REST > -- > > Key: KAFKA-6440 > URL: https://issues.apache.org/jira/browse/KAFKA-6440 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Ryan P >Priority: Minor > Labels: needs-kip > > [KIP-196|https://cwiki.apache.org/confluence/display/KAFKA/KIP-196%3A+Add+metrics+to+Kafka+Connect+framework] > adds a metric to expose the current leader of a connect cluster. It would be > helpful to make this information available via REST, along with a list of all > the clusters members. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6440) Expose Connect leader via REST
[ https://issues.apache.org/jira/browse/KAFKA-6440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryan P updated KAFKA-6440: -- Description: [KIP-196|https://cwiki.apache.org/confluence/display/KAFKA/KIP-196%3A+Add+metrics+to+Kafka+Connect+framework] adds a metric to expose the current leader of a connect cluster. It would be helpful to make this information available via REST as well as it would not require the use of a JMX client. In ad was:[KIP-196|https://cwiki.apache.org/confluence/display/KAFKA/KIP-196%3A+Add+metrics+to+Kafka+Connect+framework] adds a metric to expose the current leader of a connect cluster. It would be helpful to make this information available via REST as well as it would not require the use of a JMX client. > Expose Connect leader via REST > -- > > Key: KAFKA-6440 > URL: https://issues.apache.org/jira/browse/KAFKA-6440 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Ryan P >Priority: Minor > Labels: needs-kip > > [KIP-196|https://cwiki.apache.org/confluence/display/KAFKA/KIP-196%3A+Add+metrics+to+Kafka+Connect+framework] > adds a metric to expose the current leader of a connect cluster. It would be > helpful to make this information available via REST as well as it would not > require the use of a JMX client. > In ad -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6440) Expose Connect leader via REST
[ https://issues.apache.org/jira/browse/KAFKA-6440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16321028#comment-16321028 ] Ryan P commented on KAFKA-6440: --- [~rhauch] +1, editing description to accommodate this > Expose Connect leader via REST > -- > > Key: KAFKA-6440 > URL: https://issues.apache.org/jira/browse/KAFKA-6440 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Ryan P >Priority: Minor > Labels: needs-kip > > [KIP-196|https://cwiki.apache.org/confluence/display/KAFKA/KIP-196%3A+Add+metrics+to+Kafka+Connect+framework] > adds a metric to expose the current leader of a connect cluster. It would be > helpful to make this information available via REST as well as it would not > require the use of a JMX client. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6440) Expose Connect leader via REST
Ryan P created KAFKA-6440: - Summary: Expose Connect leader via REST Key: KAFKA-6440 URL: https://issues.apache.org/jira/browse/KAFKA-6440 Project: Kafka Issue Type: Improvement Components: KafkaConnect Affects Versions: 1.0.0 Reporter: Ryan P Priority: Minor [KIP-196|https://cwiki.apache.org/confluence/display/KAFKA/KIP-196%3A+Add+metrics+to+Kafka+Connect+framework] adds a metric to expose the current leader of a connect cluster. It would be helpful to make this information available via REST as well as it would not require the use of a JMX client. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6440) Expose Connect leader via REST
[ https://issues.apache.org/jira/browse/KAFKA-6440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-6440: - Labels: needs-kip (was: ) > Expose Connect leader via REST > -- > > Key: KAFKA-6440 > URL: https://issues.apache.org/jira/browse/KAFKA-6440 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Ryan P >Priority: Minor > Labels: needs-kip > > [KIP-196|https://cwiki.apache.org/confluence/display/KAFKA/KIP-196%3A+Add+metrics+to+Kafka+Connect+framework] > adds a metric to expose the current leader of a connect cluster. It would be > helpful to make this information available via REST as well as it would not > require the use of a JMX client. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6435) Application Reset Tool might delete incorrect internal topics
[ https://issues.apache.org/jira/browse/KAFKA-6435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-6435: - Labels: bug (was: ) > Application Reset Tool might delete incorrect internal topics > - > > Key: KAFKA-6435 > URL: https://issues.apache.org/jira/browse/KAFKA-6435 > Project: Kafka > Issue Type: Bug > Components: streams, tools >Affects Versions: 1.0.0 >Reporter: Matthias J. Sax > Labels: bug > > The streams application reset tool, deletes all topic that start with > {{-}}. > If people have two versions of the same application and name them {{"app"}} > and {{"app-v2"}}, resetting {{"app"}} would also delete the internal topics > of {{"app-v2"}}. > We either need to disallow the dash in the application ID, or improve the > topic identification logic in the reset tool to fix this. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6412) Improve synchronization in CachingKeyValueStore methods
[ https://issues.apache.org/jira/browse/KAFKA-6412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-6412: - Labels: (was: bug) > Improve synchronization in CachingKeyValueStore methods > --- > > Key: KAFKA-6412 > URL: https://issues.apache.org/jira/browse/KAFKA-6412 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ted Yu >Assignee: Ted Yu > Fix For: 1.1.0 > > Attachments: 6412-jmh.v1.txt, k-6412.v1.txt > > > Currently CachingKeyValueStore methods are synchronized at method level. > It seems we can use read lock for getter and write lock for put / delete > methods. > For getInternal(), if the underlying thread is streamThread, the > getInternal() may trigger eviction. This can be handled by obtaining write > lock at the beginning of the method for streamThread. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6412) Improve synchronization in CachingKeyValueStore methods
[ https://issues.apache.org/jira/browse/KAFKA-6412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-6412: - Labels: bug (was: ) > Improve synchronization in CachingKeyValueStore methods > --- > > Key: KAFKA-6412 > URL: https://issues.apache.org/jira/browse/KAFKA-6412 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ted Yu >Assignee: Ted Yu > Fix For: 1.1.0 > > Attachments: 6412-jmh.v1.txt, k-6412.v1.txt > > > Currently CachingKeyValueStore methods are synchronized at method level. > It seems we can use read lock for getter and write lock for put / delete > methods. > For getInternal(), if the underlying thread is streamThread, the > getInternal() may trigger eviction. This can be handled by obtaining write > lock at the beginning of the method for streamThread. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6437) Streams does not warn about missing input topics, but hangs
[ https://issues.apache.org/jira/browse/KAFKA-6437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-6437: - Labels: newbie (was: ) > Streams does not warn about missing input topics, but hangs > --- > > Key: KAFKA-6437 > URL: https://issues.apache.org/jira/browse/KAFKA-6437 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.0.0 > Environment: Single client on single node broker >Reporter: Chris Schwarzfischer >Priority: Minor > Labels: newbie > > *Case* > Streams application with two input topics being used for a left join. > When the left side topic is missing upon starting the streams application, it > hangs "in the middle" of the topology (at …9, see below). Only parts of > the intermediate topics are created (up to …9) > When the missing input topic is created, the streams application resumes > processing. > {noformat} > Topology: > StreamsTask taskId: 2_0 > ProcessorTopology: > KSTREAM-SOURCE-11: > topics: > [mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition] > children: [KTABLE-AGGREGATE-12] > KTABLE-AGGREGATE-12: > states: > [KTABLE-AGGREGATE-STATE-STORE-09] > children: [KTABLE-TOSTREAM-20] > KTABLE-TOSTREAM-20: > children: [KSTREAM-SINK-21] > KSTREAM-SINK-21: > topic: data_udr_month_customer_aggregration > KSTREAM-SOURCE-17: > topics: > [mystreams_app-KSTREAM-MAP-14-repartition] > children: [KSTREAM-LEFTJOIN-18] > KSTREAM-LEFTJOIN-18: > states: > [KTABLE-AGGREGATE-STATE-STORE-09] > children: [KSTREAM-SINK-19] > KSTREAM-SINK-19: > topic: data_UDR_joined > Partitions [mystreams_app-KSTREAM-MAP-14-repartition-0, > mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition-0] > {noformat} > *Why this matters* > The applications does quite a lot of preprocessing before joining with the > missing input topic. This preprocessing won't happen without the topic, > creating a huge backlog of data. > *Fix* > Issue an `warn` or `error` level message at start to inform about the missing > topic and it's consequences. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6412) Improve synchronization in CachingKeyValueStore methods
[ https://issues.apache.org/jira/browse/KAFKA-6412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16320898#comment-16320898 ] Guozhang Wang commented on KAFKA-6412: -- [~tedyu] way to start 2018 indeed :) > Improve synchronization in CachingKeyValueStore methods > --- > > Key: KAFKA-6412 > URL: https://issues.apache.org/jira/browse/KAFKA-6412 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ted Yu >Assignee: Ted Yu > Fix For: 1.1.0 > > Attachments: 6412-jmh.v1.txt, k-6412.v1.txt > > > Currently CachingKeyValueStore methods are synchronized at method level. > It seems we can use read lock for getter and write lock for put / delete > methods. > For getInternal(), if the underlying thread is streamThread, the > getInternal() may trigger eviction. This can be handled by obtaining write > lock at the beginning of the method for streamThread. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (KAFKA-6412) Improve synchronization in CachingKeyValueStore methods
[ https://issues.apache.org/jira/browse/KAFKA-6412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang reassigned KAFKA-6412: Assignee: Ted Yu > Improve synchronization in CachingKeyValueStore methods > --- > > Key: KAFKA-6412 > URL: https://issues.apache.org/jira/browse/KAFKA-6412 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ted Yu >Assignee: Ted Yu > Fix For: 1.1.0 > > Attachments: 6412-jmh.v1.txt, k-6412.v1.txt > > > Currently CachingKeyValueStore methods are synchronized at method level. > It seems we can use read lock for getter and write lock for put / delete > methods. > For getInternal(), if the underlying thread is streamThread, the > getInternal() may trigger eviction. This can be handled by obtaining write > lock at the beginning of the method for streamThread. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6398) Non-aggregation KTable generation operator does not construct value getter correctly
[ https://issues.apache.org/jira/browse/KAFKA-6398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16320888#comment-16320888 ] ASF GitHub Bot commented on KAFKA-6398: --- guozhangwang closed pull request #4384: KAFKA-6398: fix KTable.filter that does not include its parent's queryable storename URL: https://github.com/apache/kafka/pull/4384 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 8c79decbb6f..3bc6f4b3474 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -155,8 +155,10 @@ String internalStoreName() { builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name); if (storeSupplier != null) { builder.internalTopologyBuilder.addStateStore(storeSupplier, name); +return new KTableImpl<>(builder, name, processorSupplier, this.keySerde, this.valSerde, sourceNodes, internalStoreName, true); +} else { +return new KTableImpl<>(builder, name, processorSupplier, sourceNodes, this.queryableStoreName, false); } -return new KTableImpl<>(builder, name, processorSupplier, this.keySerde, this.valSerde, sourceNodes, internalStoreName, internalStoreName != null); } private KTabledoFilter(final Predicate predicate, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 39ea44f1bfd..39010022a0b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -456,6 +456,7 @@ public final void addSource(final Topology.AutoOffsetReset offsetReset, } for (final String predecessor : predecessorNames) { +Objects.requireNonNull(predecessor, "predecessor name can't be null"); if (predecessor.equals(name)) { throw new TopologyException("Processor " + name + " cannot be a predecessor of itself."); } @@ -483,6 +484,7 @@ public final void addProcessor(final String name, } for (final String predecessor : predecessorNames) { +Objects.requireNonNull(predecessor, "predecessor name must not be null"); if (predecessor.equals(name)) { throw new TopologyException("Processor " + name + " cannot be a predecessor of itself."); } @@ -508,6 +510,7 @@ public final void addStateStore(final org.apache.kafka.streams.processor.StateSt if (processorNames != null) { for (final String processorName : processorNames) { +Objects.requireNonNull(processorName, "processor name must not be null"); connectProcessorAndStateStore(processorName, supplier.name()); } } @@ -524,6 +527,7 @@ public final void addStateStore(final StoreBuilder storeBuilder, if (processorNames != null) { for (final String processorName : processorNames) { +Objects.requireNonNull(processorName, "processor name must not be null"); connectProcessorAndStateStore(processorName, storeBuilder.name()); } } @@ -602,11 +606,12 @@ private void validateTopicNotAlreadyRegistered(final String topic) { public final void connectProcessorAndStateStores(final String processorName, final String... stateStoreNames) { Objects.requireNonNull(processorName, "processorName can't be null"); -Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be null"); +Objects.requireNonNull(stateStoreNames, "state store list must not be null"); if (stateStoreNames.length == 0) { throw new TopologyException("Must provide at least one state store name."); } for (final String stateStoreName : stateStoreNames) { +Objects.requireNonNull(stateStoreName, "state store name must not be null"); connectProcessorAndStateStore(processorName, stateStoreName); } } @@ -627,6 +632,7 @@ public final void connectProcessors(final String... processorNames) { } for (final String processorName :
[jira] [Commented] (KAFKA-6438) NSEE while concurrently creating and deleting a topic
[ https://issues.apache.org/jira/browse/KAFKA-6438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16320694#comment-16320694 ] Ted Yu commented on KAFKA-6438: --- Two maps are accessed in updateMetadataRequestPartitionInfo(): controllerContext.partitionLeadershipInfo and controllerContext.partitionReplicaAssignment . Looks like we should check existence of {{ partition }} before proceeding. > NSEE while concurrently creating and deleting a topic > - > > Key: KAFKA-6438 > URL: https://issues.apache.org/jira/browse/KAFKA-6438 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 1.0.0 > Environment: kafka_2.11-1.0.0.jar > OpenJDK Runtime Environment (build 1.8.0_102-b14), OpenJDK 64-Bit Server VM > (build 25.102-b14, mixed mode) > CentOS Linux release 7.3.1611 (Core) >Reporter: Adam Kotwasinski > Labels: reliability > Fix For: 1.1.0 > > > It appears that deleting a topic and creating it at the same time can cause > NSEE, what later results in a forced controller shutdown. > Most probably topics are being created because consumers/producers are still > active (yes, this means the deletion is happening blindly). > The main problem here (for me) is the controller switch, the data loss and > following unclean election is acceptable (as we admit to deleting blindly). > Environment description: > 20 kafka brokers > 80k partitions (20k topics 4partitions each) > 3 node ZK > Incident: > {code:java} > [2018-01-09 11:19:05,912] INFO [Topic Deletion Manager 6], Partition deletion > callback for mytopic-2,mytopic-0,mytopic-1,mytopic-3 > (kafka.controller.TopicDeletionManager) > [2018-01-09 11:19:06,237] INFO [Controller id=6] New leader and ISR for > partition mytopic-0 is {"leader":-1,"leader_epoch":1,"isr":[]} > (kafka.controller.KafkaController) > [2018-01-09 11:19:06,412] INFO [Topic Deletion Manager 6], Deletion for > replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of > topic mytopic in progress (kafka.controller.TopicDeletionManager) > [2018-01-09 11:19:07,218] INFO [Topic Deletion Manager 6], Deletion for > replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of > topic mytopic in progress (kafka.controller.TopicDeletionManager) > [2018-01-09 11:19:07,304] INFO [Topic Deletion Manager 6], Deletion for > replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of > topic mytopic in progress (kafka.controller.TopicDeletionManager) > [2018-01-09 11:19:07,383] INFO [Topic Deletion Manager 6], Deletion for > replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of > topic mytopic in progress (kafka.controller.TopicDeletionManager) > [2018-01-09 11:19:07,510] INFO [Topic Deletion Manager 6], Deletion for > replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of > topic mytopic in progress (kafka.controller.TopicDeletionManager) > [2018-01-09 11:19:07,661] INFO [Topic Deletion Manager 6], Deletion for > replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of > topic mytopic in progress (kafka.controller.TopicDeletionManager) > [2018-01-09 11:19:07,728] INFO [Topic Deletion Manager 6], Deletion for > replicas 9,10,11 for partition mytopic-0,mytopic-1,mytopic-2 of topic mytopic > in progress (kafka.controller.TopicDeletionManager) > [2018-01-09 11:19:07,924] INFO [PartitionStateMachine controllerId=6] > Invoking state change to OfflinePartition for partitions > mytopic-2,mytopic-0,mytopic-1,mytopic-3 > (kafka.controller.PartitionStateMachine) > [2018-01-09 11:19:07,924] INFO [PartitionStateMachine controllerId=6] > Invoking state change to NonExistentPartition for partitions > mytopic-2,mytopic-0,mytopic-1,mytopic-3 > (kafka.controller.PartitionStateMachine) > [2018-01-09 11:19:08,592] INFO [Controller id=6] New topics: [Set(mytopic, > other, other2)], deleted topics: [Set()], new partition replica assignment > [Map(other-0 -> Vector(8), mytopic-2 -> Vector(6), mytopic-0 -> Vector(4), > other-2 -> Vector(10), mytopic-1 -> Vector(5), mytopic-3 -> Vector(7), > other-1 -> Vector(9), other-3 -> Vector(11))] > (kafka.controller.KafkaController) > [2018-01-09 11:19:08,593] INFO [Controller id=6] New topic creation callback > for other-0,mytopic-2,mytopic-0,other-2,mytopic-1,mytopic-3,other-1,other-3 > (kafka.controller.KafkaController) > [2018-01-09 11:19:08,596] INFO [Controller id=6] New partition creation > callback for > other-0,mytopic-2,mytopic-0,other-2,mytopic-1,mytopic-3,other-1,other-3 > (kafka.controller.KafkaController) > [2018-01-09 11:19:08,596] INFO [PartitionStateMachine controllerId=6] > Invoking state change to NewPartition for partitions >
[jira] [Commented] (KAFKA-6437) Streams does not warn about missing input topics, but hangs
[ https://issues.apache.org/jira/browse/KAFKA-6437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16320675#comment-16320675 ] Chris Schwarzfischer commented on KAFKA-6437: - Yep, I know it's by design and that doesn't need to change, of course. "It hangs in the middle" means, that the application is actually starting and processing data up to some intermediate topic. This makes it easy to overlook that there are topics missing that prevent the application from running correctly. It would make it a lot easier to spot this error if there was an error messaging saying that the topic is missing instead of simply switching to "RUNNING" as if everything was ok… > Streams does not warn about missing input topics, but hangs > --- > > Key: KAFKA-6437 > URL: https://issues.apache.org/jira/browse/KAFKA-6437 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.0.0 > Environment: Single client on single node broker >Reporter: Chris Schwarzfischer >Priority: Minor > > *Case* > Streams application with two input topics being used for a left join. > When the left side topic is missing upon starting the streams application, it > hangs "in the middle" of the topology (at …9, see below). Only parts of > the intermediate topics are created (up to …9) > When the missing input topic is created, the streams application resumes > processing. > {noformat} > Topology: > StreamsTask taskId: 2_0 > ProcessorTopology: > KSTREAM-SOURCE-11: > topics: > [mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition] > children: [KTABLE-AGGREGATE-12] > KTABLE-AGGREGATE-12: > states: > [KTABLE-AGGREGATE-STATE-STORE-09] > children: [KTABLE-TOSTREAM-20] > KTABLE-TOSTREAM-20: > children: [KSTREAM-SINK-21] > KSTREAM-SINK-21: > topic: data_udr_month_customer_aggregration > KSTREAM-SOURCE-17: > topics: > [mystreams_app-KSTREAM-MAP-14-repartition] > children: [KSTREAM-LEFTJOIN-18] > KSTREAM-LEFTJOIN-18: > states: > [KTABLE-AGGREGATE-STATE-STORE-09] > children: [KSTREAM-SINK-19] > KSTREAM-SINK-19: > topic: data_UDR_joined > Partitions [mystreams_app-KSTREAM-MAP-14-repartition-0, > mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition-0] > {noformat} > *Why this matters* > The applications does quite a lot of preprocessing before joining with the > missing input topic. This preprocessing won't happen without the topic, > creating a huge backlog of data. > *Fix* > Issue an `warn` or `error` level message at start to inform about the missing > topic and it's consequences. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6438) NSEE while concurrently creating and deleting a topic
[ https://issues.apache.org/jira/browse/KAFKA-6438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-6438: --- Fix Version/s: 1.1.0 > NSEE while concurrently creating and deleting a topic > - > > Key: KAFKA-6438 > URL: https://issues.apache.org/jira/browse/KAFKA-6438 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 1.0.0 > Environment: kafka_2.11-1.0.0.jar > OpenJDK Runtime Environment (build 1.8.0_102-b14), OpenJDK 64-Bit Server VM > (build 25.102-b14, mixed mode) > CentOS Linux release 7.3.1611 (Core) >Reporter: Adam Kotwasinski > Labels: reliability > Fix For: 1.1.0 > > > It appears that deleting a topic and creating it at the same time can cause > NSEE, what later results in a forced controller shutdown. > Most probably topics are being created because consumers/producers are still > active (yes, this means the deletion is happening blindly). > The main problem here (for me) is the controller switch, the data loss and > following unclean election is acceptable (as we admit to deleting blindly). > Environment description: > 20 kafka brokers > 80k partitions (20k topics 4partitions each) > 3 node ZK > Incident: > {code:java} > [2018-01-09 11:19:05,912] INFO [Topic Deletion Manager 6], Partition deletion > callback for mytopic-2,mytopic-0,mytopic-1,mytopic-3 > (kafka.controller.TopicDeletionManager) > [2018-01-09 11:19:06,237] INFO [Controller id=6] New leader and ISR for > partition mytopic-0 is {"leader":-1,"leader_epoch":1,"isr":[]} > (kafka.controller.KafkaController) > [2018-01-09 11:19:06,412] INFO [Topic Deletion Manager 6], Deletion for > replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of > topic mytopic in progress (kafka.controller.TopicDeletionManager) > [2018-01-09 11:19:07,218] INFO [Topic Deletion Manager 6], Deletion for > replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of > topic mytopic in progress (kafka.controller.TopicDeletionManager) > [2018-01-09 11:19:07,304] INFO [Topic Deletion Manager 6], Deletion for > replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of > topic mytopic in progress (kafka.controller.TopicDeletionManager) > [2018-01-09 11:19:07,383] INFO [Topic Deletion Manager 6], Deletion for > replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of > topic mytopic in progress (kafka.controller.TopicDeletionManager) > [2018-01-09 11:19:07,510] INFO [Topic Deletion Manager 6], Deletion for > replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of > topic mytopic in progress (kafka.controller.TopicDeletionManager) > [2018-01-09 11:19:07,661] INFO [Topic Deletion Manager 6], Deletion for > replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of > topic mytopic in progress (kafka.controller.TopicDeletionManager) > [2018-01-09 11:19:07,728] INFO [Topic Deletion Manager 6], Deletion for > replicas 9,10,11 for partition mytopic-0,mytopic-1,mytopic-2 of topic mytopic > in progress (kafka.controller.TopicDeletionManager) > [2018-01-09 11:19:07,924] INFO [PartitionStateMachine controllerId=6] > Invoking state change to OfflinePartition for partitions > mytopic-2,mytopic-0,mytopic-1,mytopic-3 > (kafka.controller.PartitionStateMachine) > [2018-01-09 11:19:07,924] INFO [PartitionStateMachine controllerId=6] > Invoking state change to NonExistentPartition for partitions > mytopic-2,mytopic-0,mytopic-1,mytopic-3 > (kafka.controller.PartitionStateMachine) > [2018-01-09 11:19:08,592] INFO [Controller id=6] New topics: [Set(mytopic, > other, other2)], deleted topics: [Set()], new partition replica assignment > [Map(other-0 -> Vector(8), mytopic-2 -> Vector(6), mytopic-0 -> Vector(4), > other-2 -> Vector(10), mytopic-1 -> Vector(5), mytopic-3 -> Vector(7), > other-1 -> Vector(9), other-3 -> Vector(11))] > (kafka.controller.KafkaController) > [2018-01-09 11:19:08,593] INFO [Controller id=6] New topic creation callback > for other-0,mytopic-2,mytopic-0,other-2,mytopic-1,mytopic-3,other-1,other-3 > (kafka.controller.KafkaController) > [2018-01-09 11:19:08,596] INFO [Controller id=6] New partition creation > callback for > other-0,mytopic-2,mytopic-0,other-2,mytopic-1,mytopic-3,other-1,other-3 > (kafka.controller.KafkaController) > [2018-01-09 11:19:08,596] INFO [PartitionStateMachine controllerId=6] > Invoking state change to NewPartition for partitions > other-0,mytopic-2,mytopic-0,other-2,mytopic-1,mytopic-3,other-1,other-3 > (kafka.controller.PartitionStateMachine) > [2018-01-09 11:19:08,642] INFO [PartitionStateMachine controllerId=6] > Invoking state change to OnlinePartition for partitions >
[jira] [Updated] (KAFKA-6438) NSEE while concurrently creating and deleting a topic
[ https://issues.apache.org/jira/browse/KAFKA-6438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-6438: --- Labels: reliability (was: ) > NSEE while concurrently creating and deleting a topic > - > > Key: KAFKA-6438 > URL: https://issues.apache.org/jira/browse/KAFKA-6438 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 1.0.0 > Environment: kafka_2.11-1.0.0.jar > OpenJDK Runtime Environment (build 1.8.0_102-b14), OpenJDK 64-Bit Server VM > (build 25.102-b14, mixed mode) > CentOS Linux release 7.3.1611 (Core) >Reporter: Adam Kotwasinski > Labels: reliability > Fix For: 1.1.0 > > > It appears that deleting a topic and creating it at the same time can cause > NSEE, what later results in a forced controller shutdown. > Most probably topics are being created because consumers/producers are still > active (yes, this means the deletion is happening blindly). > The main problem here (for me) is the controller switch, the data loss and > following unclean election is acceptable (as we admit to deleting blindly). > Environment description: > 20 kafka brokers > 80k partitions (20k topics 4partitions each) > 3 node ZK > Incident: > {code:java} > [2018-01-09 11:19:05,912] INFO [Topic Deletion Manager 6], Partition deletion > callback for mytopic-2,mytopic-0,mytopic-1,mytopic-3 > (kafka.controller.TopicDeletionManager) > [2018-01-09 11:19:06,237] INFO [Controller id=6] New leader and ISR for > partition mytopic-0 is {"leader":-1,"leader_epoch":1,"isr":[]} > (kafka.controller.KafkaController) > [2018-01-09 11:19:06,412] INFO [Topic Deletion Manager 6], Deletion for > replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of > topic mytopic in progress (kafka.controller.TopicDeletionManager) > [2018-01-09 11:19:07,218] INFO [Topic Deletion Manager 6], Deletion for > replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of > topic mytopic in progress (kafka.controller.TopicDeletionManager) > [2018-01-09 11:19:07,304] INFO [Topic Deletion Manager 6], Deletion for > replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of > topic mytopic in progress (kafka.controller.TopicDeletionManager) > [2018-01-09 11:19:07,383] INFO [Topic Deletion Manager 6], Deletion for > replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of > topic mytopic in progress (kafka.controller.TopicDeletionManager) > [2018-01-09 11:19:07,510] INFO [Topic Deletion Manager 6], Deletion for > replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of > topic mytopic in progress (kafka.controller.TopicDeletionManager) > [2018-01-09 11:19:07,661] INFO [Topic Deletion Manager 6], Deletion for > replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of > topic mytopic in progress (kafka.controller.TopicDeletionManager) > [2018-01-09 11:19:07,728] INFO [Topic Deletion Manager 6], Deletion for > replicas 9,10,11 for partition mytopic-0,mytopic-1,mytopic-2 of topic mytopic > in progress (kafka.controller.TopicDeletionManager) > [2018-01-09 11:19:07,924] INFO [PartitionStateMachine controllerId=6] > Invoking state change to OfflinePartition for partitions > mytopic-2,mytopic-0,mytopic-1,mytopic-3 > (kafka.controller.PartitionStateMachine) > [2018-01-09 11:19:07,924] INFO [PartitionStateMachine controllerId=6] > Invoking state change to NonExistentPartition for partitions > mytopic-2,mytopic-0,mytopic-1,mytopic-3 > (kafka.controller.PartitionStateMachine) > [2018-01-09 11:19:08,592] INFO [Controller id=6] New topics: [Set(mytopic, > other, other2)], deleted topics: [Set()], new partition replica assignment > [Map(other-0 -> Vector(8), mytopic-2 -> Vector(6), mytopic-0 -> Vector(4), > other-2 -> Vector(10), mytopic-1 -> Vector(5), mytopic-3 -> Vector(7), > other-1 -> Vector(9), other-3 -> Vector(11))] > (kafka.controller.KafkaController) > [2018-01-09 11:19:08,593] INFO [Controller id=6] New topic creation callback > for other-0,mytopic-2,mytopic-0,other-2,mytopic-1,mytopic-3,other-1,other-3 > (kafka.controller.KafkaController) > [2018-01-09 11:19:08,596] INFO [Controller id=6] New partition creation > callback for > other-0,mytopic-2,mytopic-0,other-2,mytopic-1,mytopic-3,other-1,other-3 > (kafka.controller.KafkaController) > [2018-01-09 11:19:08,596] INFO [PartitionStateMachine controllerId=6] > Invoking state change to NewPartition for partitions > other-0,mytopic-2,mytopic-0,other-2,mytopic-1,mytopic-3,other-1,other-3 > (kafka.controller.PartitionStateMachine) > [2018-01-09 11:19:08,642] INFO [PartitionStateMachine controllerId=6] > Invoking state change to OnlinePartition for partitions >
[jira] [Updated] (KAFKA-6438) NSEE while concurrently creating and deleting a topic
[ https://issues.apache.org/jira/browse/KAFKA-6438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Adam Kotwasinski updated KAFKA-6438: Description: It appears that deleting a topic and creating it at the same time can cause NSEE, what later results in a forced controller shutdown. Most probably topics are being created because consumers/producers are still active (yes, this means the deletion is happening blindly). The main problem here (for me) is the controller switch, the data loss and following unclean election is acceptable (as we admit to deleting blindly). Environment description: 20 kafka brokers 80k partitions (20k topics 4partitions each) 3 node ZK Incident: {code:java} [2018-01-09 11:19:05,912] INFO [Topic Deletion Manager 6], Partition deletion callback for mytopic-2,mytopic-0,mytopic-1,mytopic-3 (kafka.controller.TopicDeletionManager) [2018-01-09 11:19:06,237] INFO [Controller id=6] New leader and ISR for partition mytopic-0 is {"leader":-1,"leader_epoch":1,"isr":[]} (kafka.controller.KafkaController) [2018-01-09 11:19:06,412] INFO [Topic Deletion Manager 6], Deletion for replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of topic mytopic in progress (kafka.controller.TopicDeletionManager) [2018-01-09 11:19:07,218] INFO [Topic Deletion Manager 6], Deletion for replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of topic mytopic in progress (kafka.controller.TopicDeletionManager) [2018-01-09 11:19:07,304] INFO [Topic Deletion Manager 6], Deletion for replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of topic mytopic in progress (kafka.controller.TopicDeletionManager) [2018-01-09 11:19:07,383] INFO [Topic Deletion Manager 6], Deletion for replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of topic mytopic in progress (kafka.controller.TopicDeletionManager) [2018-01-09 11:19:07,510] INFO [Topic Deletion Manager 6], Deletion for replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of topic mytopic in progress (kafka.controller.TopicDeletionManager) [2018-01-09 11:19:07,661] INFO [Topic Deletion Manager 6], Deletion for replicas 12,9,10,11 for partition mytopic-3,mytopic-0,mytopic-1,mytopic-2 of topic mytopic in progress (kafka.controller.TopicDeletionManager) [2018-01-09 11:19:07,728] INFO [Topic Deletion Manager 6], Deletion for replicas 9,10,11 for partition mytopic-0,mytopic-1,mytopic-2 of topic mytopic in progress (kafka.controller.TopicDeletionManager) [2018-01-09 11:19:07,924] INFO [PartitionStateMachine controllerId=6] Invoking state change to OfflinePartition for partitions mytopic-2,mytopic-0,mytopic-1,mytopic-3 (kafka.controller.PartitionStateMachine) [2018-01-09 11:19:07,924] INFO [PartitionStateMachine controllerId=6] Invoking state change to NonExistentPartition for partitions mytopic-2,mytopic-0,mytopic-1,mytopic-3 (kafka.controller.PartitionStateMachine) [2018-01-09 11:19:08,592] INFO [Controller id=6] New topics: [Set(mytopic, other, other2)], deleted topics: [Set()], new partition replica assignment [Map(other-0 -> Vector(8), mytopic-2 -> Vector(6), mytopic-0 -> Vector(4), other-2 -> Vector(10), mytopic-1 -> Vector(5), mytopic-3 -> Vector(7), other-1 -> Vector(9), other-3 -> Vector(11))] (kafka.controller.KafkaController) [2018-01-09 11:19:08,593] INFO [Controller id=6] New topic creation callback for other-0,mytopic-2,mytopic-0,other-2,mytopic-1,mytopic-3,other-1,other-3 (kafka.controller.KafkaController) [2018-01-09 11:19:08,596] INFO [Controller id=6] New partition creation callback for other-0,mytopic-2,mytopic-0,other-2,mytopic-1,mytopic-3,other-1,other-3 (kafka.controller.KafkaController) [2018-01-09 11:19:08,596] INFO [PartitionStateMachine controllerId=6] Invoking state change to NewPartition for partitions other-0,mytopic-2,mytopic-0,other-2,mytopic-1,mytopic-3,other-1,other-3 (kafka.controller.PartitionStateMachine) [2018-01-09 11:19:08,642] INFO [PartitionStateMachine controllerId=6] Invoking state change to OnlinePartition for partitions other-0,mytopic-2,mytopic-0,other-2,mytopic-1,mytopic-3,other-1,other-3 (kafka.controller.PartitionStateMachine) [2018-01-09 11:19:08,828] INFO [Topic Deletion Manager 6], Partition deletion callback for mytopic-2,mytopic-0,mytopic-1,mytopic-3 (kafka.controller.TopicDeletionManager) [2018-01-09 11:19:09,127] INFO [Controller id=6] New leader and ISR for partition mytopic-0 is {"leader":-1,"leader_epoch":1,"isr":[]} (kafka.controller.KafkaController) [2018-01-09 11:19:09,607] ERROR [controller-event-thread]: Error processing event TopicDeletion(Set(mytopic, other)) (kafka.controller.Contr ollerEventManager$ControllerEventThread) java.util.NoSuchElementException: key not found: mytopic-0 at scala.collection.MapLike$class.default(MapLike.scala:228) at
[jira] [Commented] (KAFKA-6437) Streams does not warn about missing input topics, but hangs
[ https://issues.apache.org/jira/browse/KAFKA-6437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16320641#comment-16320641 ] Matthias J. Sax commented on KAFKA-6437: I cannot follow. What do you mean by "it handgs in the middle" ? Also note, that the behavior you describe is "by design" because the used consumer works this way. It's also well documented that you need to create all input topics before you start your application. We can of course do more logging, but this does not really "solve" the problem... > Streams does not warn about missing input topics, but hangs > --- > > Key: KAFKA-6437 > URL: https://issues.apache.org/jira/browse/KAFKA-6437 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.0.0 > Environment: Single client on single node broker >Reporter: Chris Schwarzfischer >Priority: Minor > > *Case* > Streams application with two input topics being used for a left join. > When the left side topic is missing upon starting the streams application, it > hangs "in the middle" of the topology (at …9, see below). Only parts of > the intermediate topics are created (up to …9) > When the missing input topic is created, the streams application resumes > processing. > {noformat} > Topology: > StreamsTask taskId: 2_0 > ProcessorTopology: > KSTREAM-SOURCE-11: > topics: > [mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition] > children: [KTABLE-AGGREGATE-12] > KTABLE-AGGREGATE-12: > states: > [KTABLE-AGGREGATE-STATE-STORE-09] > children: [KTABLE-TOSTREAM-20] > KTABLE-TOSTREAM-20: > children: [KSTREAM-SINK-21] > KSTREAM-SINK-21: > topic: data_udr_month_customer_aggregration > KSTREAM-SOURCE-17: > topics: > [mystreams_app-KSTREAM-MAP-14-repartition] > children: [KSTREAM-LEFTJOIN-18] > KSTREAM-LEFTJOIN-18: > states: > [KTABLE-AGGREGATE-STATE-STORE-09] > children: [KSTREAM-SINK-19] > KSTREAM-SINK-19: > topic: data_UDR_joined > Partitions [mystreams_app-KSTREAM-MAP-14-repartition-0, > mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition-0] > {noformat} > *Why this matters* > The applications does quite a lot of preprocessing before joining with the > missing input topic. This preprocessing won't happen without the topic, > creating a huge backlog of data. > *Fix* > Issue an `warn` or `error` level message at start to inform about the missing > topic and it's consequences. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-3625) Move kafka-streams test fixtures into a published package
[ https://issues.apache.org/jira/browse/KAFKA-3625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16320631#comment-16320631 ] Matthias J. Sax commented on KAFKA-3625: Thanks a lot for this feedback! This is super helpful! The artifact you are using atm, it not public API, and thus, there is no guarantee that your tests don't break if you upgrade. (Additionally, you pull in all Kafka Streams unit tests that you are actually not interested in.) Thus, we want to have a public {{streams-test-utils}} package. About serialization -- I completely understand that this is annoying, but we cannot easily avoid it... But we try to minimize the required boilerplate code. Hope you participate in the KIP discussion that I want to start at the mailing list this week. > Move kafka-streams test fixtures into a published package > - > > Key: KAFKA-3625 > URL: https://issues.apache.org/jira/browse/KAFKA-3625 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Jeff Klukas >Assignee: Matthias J. Sax >Priority: Minor > Labels: needs-kip, user-experience > > The KStreamTestDriver and related fixtures defined in > streams/src/test/java/org/apache/kafka/test would be useful to developers > building applications on top of Kafka Streams, but they are not currently > exposed in a package. > I propose moving this directory to live under streams/fixtures/src/main and > creating a new 'streams:fixtures' project in the gradle configuration to > publish these as a separate package. > KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-3625) Move kafka-streams test fixtures into a published package
[ https://issues.apache.org/jira/browse/KAFKA-3625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16320605#comment-16320605 ] Scott Davis edited comment on KAFKA-3625 at 1/10/18 4:45 PM: - FYI I just setup some unit tests using the (not documented) internal test classes in Kafka Streams, and I thought I'd share some thoughts on my experience with it. First, it's super-helpful! The high-level DSL style of Kafka Streams applications doesn't fit well into standard unit testing frameworks, and this solves that problem. It's also helpful compared with integration testing in the sense that it isn't necessary to produce source messages on a broker, wait for timeouts, etc. The internal test classes in 1.0.0 require the unit tests to provide serializers. This can create some extra boilerplate work to configure the serializers (especially the Confluent AVRO Serde, which requires a schema registry). However, I only need to unit test my application logic (i.e. the contents of the "filter" and "map" methods, etc). I can see how testing serializers is a requirement for testing Kafka Streams internally, but it isn't a requirement for testing the logic of Kafka Streams applications. Note: To work around this, I created a "JavaObjectSerde", which uses java.io.Object(In|Out)putStream, for use by the unit tests. I needed about 10 lines of boilerplate code in an @Before method to setup the ProcessorTopologyTestDriver, which seemed slightly excessive but not burdensome. Most of it was to create the serializers and the StreamsConfig, which aren't part of my application logic but were required to create the test driver. I was able to get it to work by adding {{org.apache.kafka:kafka-streams:1.0.0:test}} to my build.gradle, so it seems there already is an artifact. However, there is no documentation, which seems to me like the biggest drawback. I learned it by reading the Kafka Streams source code. was (Author: scott.davis): FYI I just setup some unit tests using the (not documented) internal test classes in Kafka Streams, and I thought I'd share some thoughts on my experience with it. First, it's super-helpful! The high-level DSL style of Kafka Streams applications doesn't fit well into standard unit testing frameworks, and this solves that problem. It's also helpful compared with integration testing in the sense that it isn't necessary to produce source messages on a broker, wait for timeouts, etc. The internal test classes in 1.0.0 require the unit tests to provide serializers. This can create some extra boilerplate work to configure the serializers (especially the Confluent AVRO Serde, which requires a schema registry). However, I only need to unit test my application logic (i.e. the contents of the "filter" and "map" methods, etc). I can see how testing serializers is a requirement for testing Kafka Streams internally, but it isn't a requirement for testing the logic of Kafka Streams applications. Note: To work around this, I created a "JavaObjectSerde", which uses java.io.Object(In|Out)putStream, for use by the unit tests. I needed about 10 lines of boilerplate code in an @Before method to setup the ProcessorTopologyTestDriver, which seemed slightly excessive but not burdensome. Most of it was to create the serializers and the StreamsConfig, which aren't part of my application logic but were required to create the test driver. > Move kafka-streams test fixtures into a published package > - > > Key: KAFKA-3625 > URL: https://issues.apache.org/jira/browse/KAFKA-3625 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Jeff Klukas >Assignee: Matthias J. Sax >Priority: Minor > Labels: needs-kip, user-experience > > The KStreamTestDriver and related fixtures defined in > streams/src/test/java/org/apache/kafka/test would be useful to developers > building applications on top of Kafka Streams, but they are not currently > exposed in a package. > I propose moving this directory to live under streams/fixtures/src/main and > creating a new 'streams:fixtures' project in the gradle configuration to > publish these as a separate package. > KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-3625) Move kafka-streams test fixtures into a published package
[ https://issues.apache.org/jira/browse/KAFKA-3625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16320605#comment-16320605 ] Scott Davis commented on KAFKA-3625: FYI I just setup some unit tests using the (not documented) internal test classes in Kafka Streams, and I thought I'd share some thoughts on my experience with it. First, it's super-helpful! The high-level DSL style of Kafka Streams applications doesn't fit well into standard unit testing frameworks, and this solves that problem. It's also helpful compared with integration testing in the sense that it isn't necessary to produce source messages on a broker, wait for timeouts, etc. The internal test classes in 1.0.0 require the unit tests to provide serializers. This can create some extra boilerplate work to configure the serializers (especially the Confluent AVRO Serde, which requires a schema registry). However, I only need to unit test my application logic (i.e. the contents of the "filter" and "map" methods, etc). I can see how testing serializers is a requirement for testing Kafka Streams internally, but it isn't a requirement for testing the logic of Kafka Streams applications. Note: To work around this, I created a "JavaObjectSerde", which uses java.io.Object(In|Out)putStream, for use by the unit tests. I needed about 10 lines of boilerplate code in an @Before method to setup the ProcessorTopologyTestDriver, which seemed slightly excessive but not burdensome. Most of it was to create the serializers and the StreamsConfig, which aren't part of my application logic but were required to create the test driver. > Move kafka-streams test fixtures into a published package > - > > Key: KAFKA-3625 > URL: https://issues.apache.org/jira/browse/KAFKA-3625 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Jeff Klukas >Assignee: Matthias J. Sax >Priority: Minor > Labels: needs-kip, user-experience > > The KStreamTestDriver and related fixtures defined in > streams/src/test/java/org/apache/kafka/test would be useful to developers > building applications on top of Kafka Streams, but they are not currently > exposed in a package. > I propose moving this directory to live under streams/fixtures/src/main and > creating a new 'streams:fixtures' project in the gradle configuration to > publish these as a separate package. > KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6439) "com.streamsets.pipeline.api.StageException: KAFKA_50 - Error writing data to the Kafka broker: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.Ne
[ https://issues.apache.org/jira/browse/KAFKA-6439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] srithar durairaj updated KAFKA-6439: Affects Version/s: 0.10.0.1 Environment: Ubuntu 64bit Description: We are using streamset to produce data into kafka topic (3 node cluster). We are facing following error frequently in production. "com.streamsets.pipeline.api.StageException: KAFKA_50 - Error writing data to the Kafka broker: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received" Component/s: network Summary: "com.streamsets.pipeline.api.StageException: KAFKA_50 - Error writing data to the Kafka broker: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received" (was: We are using streamset to produce data into kafka topic (3 node cluster). We are facing following error frequently in production. ) > "com.streamsets.pipeline.api.StageException: KAFKA_50 - Error writing data to > the Kafka broker: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.NetworkException: The server disconnected > before a response was received" > - > > Key: KAFKA-6439 > URL: https://issues.apache.org/jira/browse/KAFKA-6439 > Project: Kafka > Issue Type: Bug > Components: network >Affects Versions: 0.10.0.1 > Environment: Ubuntu 64bit >Reporter: srithar durairaj > > We are using streamset to produce data into kafka topic (3 node cluster). We > are facing following error frequently in production. > "com.streamsets.pipeline.api.StageException: KAFKA_50 - Error writing data to > the Kafka broker: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.NetworkException: The server disconnected > before a response was received" -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6439) We are using streamset to produce data into kafka topic (3 node cluster). We are facing following error frequently in production.
srithar durairaj created KAFKA-6439: --- Summary: We are using streamset to produce data into kafka topic (3 node cluster). We are facing following error frequently in production. Key: KAFKA-6439 URL: https://issues.apache.org/jira/browse/KAFKA-6439 Project: Kafka Issue Type: Bug Reporter: srithar durairaj -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6437) Streams does not warn about missing input topics, but hangs
[ https://issues.apache.org/jira/browse/KAFKA-6437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16320487#comment-16320487 ] Bill Bejeck commented on KAFKA-6437: [~k1th] thanks for reporting. > Streams does not warn about missing input topics, but hangs > --- > > Key: KAFKA-6437 > URL: https://issues.apache.org/jira/browse/KAFKA-6437 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.0.0 > Environment: Single client on single node broker >Reporter: Chris Schwarzfischer >Priority: Minor > > *Case* > Streams application with two input topics being used for a left join. > When the left side topic is missing upon starting the streams application, it > hangs "in the middle" of the topology (at …9, see below). Only parts of > the intermediate topics are created (up to …9) > When the missing input topic is created, the streams application resumes > processing. > {noformat} > Topology: > StreamsTask taskId: 2_0 > ProcessorTopology: > KSTREAM-SOURCE-11: > topics: > [mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition] > children: [KTABLE-AGGREGATE-12] > KTABLE-AGGREGATE-12: > states: > [KTABLE-AGGREGATE-STATE-STORE-09] > children: [KTABLE-TOSTREAM-20] > KTABLE-TOSTREAM-20: > children: [KSTREAM-SINK-21] > KSTREAM-SINK-21: > topic: data_udr_month_customer_aggregration > KSTREAM-SOURCE-17: > topics: > [mystreams_app-KSTREAM-MAP-14-repartition] > children: [KSTREAM-LEFTJOIN-18] > KSTREAM-LEFTJOIN-18: > states: > [KTABLE-AGGREGATE-STATE-STORE-09] > children: [KSTREAM-SINK-19] > KSTREAM-SINK-19: > topic: data_UDR_joined > Partitions [mystreams_app-KSTREAM-MAP-14-repartition-0, > mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition-0] > {noformat} > *Why this matters* > The applications does quite a lot of preprocessing before joining with the > missing input topic. This preprocessing won't happen without the topic, > creating a huge backlog of data. > *Fix* > Issue an `warn` or `error` level message at start to inform about the missing > topic and it's consequences. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6437) Streams does not warn about missing input topics, but hangs
[ https://issues.apache.org/jira/browse/KAFKA-6437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Schwarzfischer updated KAFKA-6437: Issue Type: Improvement (was: Bug) > Streams does not warn about missing input topics, but hangs > --- > > Key: KAFKA-6437 > URL: https://issues.apache.org/jira/browse/KAFKA-6437 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.0.0 > Environment: Single client on single node broker >Reporter: Chris Schwarzfischer >Priority: Minor > > *Case* > Streams application with two input topics being used for a left join. > When the left side topic is missing upon starting the streams application, it > hangs "in the middle" of the topology (at …9, see below). Only parts of > the intermediate topics are created (up to …9) > When the missing input topic is created, the streams application resumes > processing. > {noformat} > Topology: > StreamsTask taskId: 2_0 > ProcessorTopology: > KSTREAM-SOURCE-11: > topics: > [mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition] > children: [KTABLE-AGGREGATE-12] > KTABLE-AGGREGATE-12: > states: > [KTABLE-AGGREGATE-STATE-STORE-09] > children: [KTABLE-TOSTREAM-20] > KTABLE-TOSTREAM-20: > children: [KSTREAM-SINK-21] > KSTREAM-SINK-21: > topic: data_udr_month_customer_aggregration > KSTREAM-SOURCE-17: > topics: > [mystreams_app-KSTREAM-MAP-14-repartition] > children: [KSTREAM-LEFTJOIN-18] > KSTREAM-LEFTJOIN-18: > states: > [KTABLE-AGGREGATE-STATE-STORE-09] > children: [KSTREAM-SINK-19] > KSTREAM-SINK-19: > topic: data_UDR_joined > Partitions [mystreams_app-KSTREAM-MAP-14-repartition-0, > mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition-0] > {noformat} > *Why this matters* > The applications does quite a lot of preprocessing before joining with the > missing input topic. This preprocessing won't happen without the topic, > creating a huge backlog of data. > *Fix* > Issue an `warn` or `error` level message at start to inform about the missing > topic and it's consequences. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6437) Streams does not warn about missing input topics, but hangs
[ https://issues.apache.org/jira/browse/KAFKA-6437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Schwarzfischer updated KAFKA-6437: Description: *Case* Streams application with two input topics being used for a left join. When the left side topic is missing upon starting the streams application, it hangs "in the middle" of the topology (at …9, see below). Only parts of the intermediate topics are created (up to …9) When the missing input topic is created, the streams application resumes processing. {noformat} Topology: StreamsTask taskId: 2_0 ProcessorTopology: KSTREAM-SOURCE-11: topics: [mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition] children: [KTABLE-AGGREGATE-12] KTABLE-AGGREGATE-12: states: [KTABLE-AGGREGATE-STATE-STORE-09] children: [KTABLE-TOSTREAM-20] KTABLE-TOSTREAM-20: children: [KSTREAM-SINK-21] KSTREAM-SINK-21: topic: data_udr_month_customer_aggregration KSTREAM-SOURCE-17: topics: [mystreams_app-KSTREAM-MAP-14-repartition] children: [KSTREAM-LEFTJOIN-18] KSTREAM-LEFTJOIN-18: states: [KTABLE-AGGREGATE-STATE-STORE-09] children: [KSTREAM-SINK-19] KSTREAM-SINK-19: topic: data_UDR_joined Partitions [mystreams_app-KSTREAM-MAP-14-repartition-0, mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition-0] {noformat} *Why this matters* The applications does quite a lot of preprocessing before joining with the missing input topic. This preprocessing won't happen without the topic, creating a huge backlog of data. *Fix* Issue an `warn` or `error` level message at start to inform about the missing topic and it's consequences. was: *Case* Streams application with two input topics being used for a left join. When the left side topic is missing upon starting the streams application, it hangs "in the middle" of the topology (at …9, see below). Only parts of the intermediate topics are created (up to …9) When the missing input topic is created, the streams application resumes processing. {noformat} Topology: StreamsTask taskId: 2_0 ProcessorTopology: KSTREAM-SOURCE-11: topics: [mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition] children: [KTABLE-AGGREGATE-12] KTABLE-AGGREGATE-12: states: [KTABLE-AGGREGATE-STATE-STORE-09] children: [KTABLE-TOSTREAM-20] KTABLE-TOSTREAM-20: children: [KSTREAM-SINK-21] KSTREAM-SINK-21: topic: faxout_udr_month_customer_aggregration KSTREAM-SOURCE-17: topics: [mystreams_app-KSTREAM-MAP-14-repartition] children: [KSTREAM-LEFTJOIN-18] KSTREAM-LEFTJOIN-18: states: [KTABLE-AGGREGATE-STATE-STORE-09] children: [KSTREAM-SINK-19] KSTREAM-SINK-19: topic: data_UDR_joined Partitions [mystreams_app-KSTREAM-MAP-14-repartition-0, mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition-0] {noformat} *Why this matters* The applications does quite a lot of preprocessing before joining with the missing input topic. This preprocessing won't happen without the topic, creating a huge backlog of data. *Fix* Issue an `warn` or `error` level message at start to inform about the missing topic and it's consequences. > Streams does not warn about missing input topics, but hangs > --- > > Key: KAFKA-6437 > URL: https://issues.apache.org/jira/browse/KAFKA-6437 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 > Environment: Single client on single node broker >Reporter: Chris Schwarzfischer >Priority: Minor > > *Case* > Streams application with two input topics being used for a left join. > When the left side topic is missing upon starting the streams application, it > hangs "in the
[jira] [Resolved] (KAFKA-2331) Kafka does not spread partitions in a topic among all consumers evenly
[ https://issues.apache.org/jira/browse/KAFKA-2331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar resolved KAFKA-2331. -- Resolution: Auto Closed Closing inactive issue. The old consumer is no longer supported, please upgrade to the Java consumer whenever possible. > Kafka does not spread partitions in a topic among all consumers evenly > -- > > Key: KAFKA-2331 > URL: https://issues.apache.org/jira/browse/KAFKA-2331 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 0.8.1.1 >Reporter: Stefan Miklosovic > > I want to have 1 topic with 10 partitions. I am using default configuration > of Kafka. I create 1 topic with 10 partitions by that helper script and now I > am about to produce messages to it. > The thing is that even all partitions are indeed consumed, some consumers > have more then 1 partition assigned even I have number of consumer threads > equal to partitions in a topic hence some threads are idle. > Let's describe it in more detail. > I know that common stuff that you need one consumer thread per partition. I > want to be able to commit offsets per partition and this is possible only > when I have 1 thread per consumer connector per partition (I am using high > level consumer). > So I create 10 threads, in each thread I am calling > Consumer.createJavaConsumerConnector() where I am doing this > topicCountMap.put("mytopic", 1); > and in the end I have 1 iterator which consumes messages from 1 partition. > When I do this 10 times, I have 10 consumers, consumer per thread per > partition where I can commit offsets independently per partition because if I > put different number from 1 in topic map, I would end up with more then 1 > consumer thread for that topic for given consumer instance so if I am about > to commit offsets with created consumer instance, it would commit them for > all threads which is not desired. > But the thing is that when I use consumers, only 7 consumers are involved and > it seems that other consumer threads are idle but I do not know why. > The thing is that I am creating these consumer threads in a loop. So I start > first thread (submit to executor service), then another, then another and so > on. > So the scenario is that first consumer gets all 10 partitions, then 2nd > connects so it is splits between these two to 5 and 5 (or something similar), > then other threads are connecting. > I understand this as a partition rebalancing among all consumers so it > behaves well in such sense that if more consumers are being created, > partition rebalancing occurs between these consumers so every consumer should > have some partitions to operate upon. > But from the results I see that there is only 7 consumers and according to > consumed messages it seems they are split like 3,2,1,1,1,1,1 partition-wise. > Yes, these 7 consumers covered all 10 partitions, but why consumers with more > then 1 partition do no split and give partitions to remaining 3 consumers? > I am pretty much wondering what is happening with remaining 3 threads and why > they do not "grab" partitions from consumers which have more then 1 partition > assigned. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-2329) Consumers balance fails when multiple consumers are started simultaneously.
[ https://issues.apache.org/jira/browse/KAFKA-2329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar resolved KAFKA-2329. -- Resolution: Auto Closed Closing inactive issue. The old consumer is no longer supported, please upgrade to the Java consumer whenever possible. > Consumers balance fails when multiple consumers are started simultaneously. > --- > > Key: KAFKA-2329 > URL: https://issues.apache.org/jira/browse/KAFKA-2329 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.8.1.1, 0.8.2.1 >Reporter: Ze'ev Eli Klapow >Assignee: Ze'ev Eli Klapow > Labels: consumer, patch > Attachments: zookeeper-consumer-connector-epoch-node.patch > > > During consumer startup a race condition can occur if multiple consumers are > started (nearly) simultaneously. > If a second consumer is started while the first consumer is in the middle of > {{zkClient.subscribeChildChanges}} the first consumer will never see the > registration of the second consumer, because the consumer registration node > for the second consumer will be unwatched, and no new child will be > registered later. This causes the first consumer to own all partitions, and > then never release ownership causing the second consumer to fail rebalancing. > The attached patch solves this by using an "epoch" node which all consumers > watch and update to trigger a rebalance. When a rebalance is triggered we > check the consumer registrations against a cached state, to avoid unnecessary > rebalances. For safety, we also periodically check the consumer registrations > and rebalance. We have been using this patch in production at HubSpot for a > while and it has eliminated all rebalance issues. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-2025) In multi-consumer setup - explicit commit, commits on all partitions
[ https://issues.apache.org/jira/browse/KAFKA-2025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar resolved KAFKA-2025. -- Resolution: Auto Closed Closing inactive issue. The old consumer is no longer supported, please upgrade to the Java consumer whenever possible. > In multi-consumer setup - explicit commit, commits on all partitions > > > Key: KAFKA-2025 > URL: https://issues.apache.org/jira/browse/KAFKA-2025 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.8.2.0 > Environment: 1. Tested in Windows > 2. Not tested on Linux >Reporter: Pradeep G >Assignee: Neha Narkhede >Priority: Critical > > In a setup where there are two consumers C1 & C2 belonging to consumer group > CG, two partitions P1 & P2; with auto-commit disabled. > An explicit commit on ConsumerConnect commits on all the consumers i.e. a > commit called by C1 commits all messages being processed by other consumers > too here C2. > Ideally C1 should be able to commit only those messages it has consumed and > not what is being processed by C2. The effect of this behavior is that; > suppose C2 crashes while processing message M after C1 commits, is that > message M being processed by C2 is not available on recovery and is lost > forever; and in kafka M is marked as consumed. > I read that this would be addressed in the rewrite - > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite#ClientRewrite-ConsumerAPI > Any thoughts on which release this would be addressed ?. A quick response > would be greatly appreciated. > Thanks, > Pradeep -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5946) Give connector method parameter better name
[ https://issues.apache.org/jira/browse/KAFKA-5946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated KAFKA-5946: -- Labels: connector newbie usability (was: connector usability) > Give connector method parameter better name > --- > > Key: KAFKA-5946 > URL: https://issues.apache.org/jira/browse/KAFKA-5946 > Project: Kafka > Issue Type: Improvement >Reporter: Ted Yu > Labels: connector, newbie, usability > > During the development of KAFKA-5657, there were several iterations where > method call didn't match what the connector parameter actually represents. > [~ewencp] had used connType as equivalent to connClass because Type wasn't > used to differentiate source vs sink. > [~ewencp] proposed the following: > {code} > It would help to convert all the uses of connType to connClass first, then > standardize on class == java class, type == source/sink, name == > user-specified name. > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-1229) Reload broker config without a restart
[ https://issues.apache.org/jira/browse/KAFKA-1229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar resolved KAFKA-1229. -- Resolution: Duplicate Resolving as duplicate of KIP-226/KAFKA-6240. Pls reopen of any concern. > Reload broker config without a restart > -- > > Key: KAFKA-1229 > URL: https://issues.apache.org/jira/browse/KAFKA-1229 > Project: Kafka > Issue Type: Wish > Components: config >Affects Versions: 0.8.0 >Reporter: Carlo Cabanilla >Priority: Minor > > In order to minimize client disruption, ideally you'd be able to reload > broker config without having to restart the server. On *nix system the > convention is to have the process reread its configuration if it receives a > SIGHUP signal. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6412) Improve synchronization in CachingKeyValueStore methods
[ https://issues.apache.org/jira/browse/KAFKA-6412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16320047#comment-16320047 ] Ted Yu commented on KAFKA-6412: --- I came up with the initial idea for this JIRA when sitting in hotel lobby at Grand Canyon. It was nice way to start 2018. > Improve synchronization in CachingKeyValueStore methods > --- > > Key: KAFKA-6412 > URL: https://issues.apache.org/jira/browse/KAFKA-6412 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ted Yu > Fix For: 1.1.0 > > Attachments: 6412-jmh.v1.txt, k-6412.v1.txt > > > Currently CachingKeyValueStore methods are synchronized at method level. > It seems we can use read lock for getter and write lock for put / delete > methods. > For getInternal(), if the underlying thread is streamThread, the > getInternal() may trigger eviction. This can be handled by obtaining write > lock at the beginning of the method for streamThread. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6412) Improve synchronization in CachingKeyValueStore methods
[ https://issues.apache.org/jira/browse/KAFKA-6412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16320013#comment-16320013 ] ASF GitHub Bot commented on KAFKA-6412: --- dguy closed pull request #4372: KAFKA-6412 Improve synchronization in CachingKeyValueStore methods URL: https://github.com/apache/kafka/pull/4372 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index f0669a4f6ee..9fff8ccca04 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -31,6 +31,9 @@ import java.util.List; import java.util.Objects; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; class CachingKeyValueStoreextends WrappedStateStore.AbstractStateStore implements KeyValueStore , CachedStateStore { @@ -44,6 +47,7 @@ private InternalProcessorContext context; private StateSerdes serdes; private Thread streamThread; +private ReadWriteLock lock = new ReentrantReadWriteLock(); CachingKeyValueStore(final KeyValueStore underlying, final Serde keySerde, @@ -108,9 +112,14 @@ public void setFlushListener(final CacheFlushListener flushListener, } @Override -public synchronized void flush() { -cache.flush(cacheName); -underlying.flush(); +public void flush() { +lock.writeLock().lock(); +try { +cache.flush(cacheName); +underlying.flush(); +} finally { +lock.writeLock().unlock(); +} } @Override @@ -131,10 +140,21 @@ public boolean isOpen() { } @Override -public synchronized byte[] get(final Bytes key) { +public byte[] get(final Bytes key) { validateStoreOpen(); -Objects.requireNonNull(key); -return getInternal(key); +Lock theLock; +if (Thread.currentThread().equals(streamThread)) { +theLock = lock.writeLock(); +} else { +theLock = lock.readLock(); +} +theLock.lock(); +try { +Objects.requireNonNull(key); +return getInternal(key); +} finally { +theLock.unlock(); +} } private byte[] getInternal(final Bytes key) { @@ -176,50 +196,75 @@ public boolean isOpen() { } @Override -public synchronized long approximateNumEntries() { +public long approximateNumEntries() { validateStoreOpen(); -return underlying.approximateNumEntries(); +lock.readLock().lock(); +try { +return underlying.approximateNumEntries(); +} finally { +lock.readLock().unlock(); +} } @Override -public synchronized void put(final Bytes key, final byte[] value) { +public void put(final Bytes key, final byte[] value) { Objects.requireNonNull(key, "key cannot be null"); validateStoreOpen(); -putInternal(key, value); +lock.writeLock().lock(); +try { +putInternal(key, value); +} finally { +lock.writeLock().unlock(); +} } -private synchronized void putInternal(final Bytes rawKey, final byte[] value) { +private void putInternal(final Bytes rawKey, final byte[] value) { Objects.requireNonNull(rawKey, "key cannot be null"); cache.put(cacheName, rawKey, new LRUCacheEntry(value, true, context.offset(), - context.timestamp(), context.partition(), context.topic())); + context.timestamp(), context.partition(), context.topic())); } @Override -public synchronized byte[] putIfAbsent(final Bytes key, final byte[] value) { +public byte[] putIfAbsent(final Bytes key, final byte[] value) { Objects.requireNonNull(key, "key cannot be null"); validateStoreOpen(); -final byte[] v = getInternal(key); -if (v == null) { -putInternal(key, value); +lock.writeLock().lock(); +try { +final byte[] v = getInternal(key); +if (v == null) { +putInternal(key, value); +} +return v; +} finally { +lock.writeLock().unlock(); } -return v; } @Override -
[jira] [Resolved] (KAFKA-6412) Improve synchronization in CachingKeyValueStore methods
[ https://issues.apache.org/jira/browse/KAFKA-6412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy resolved KAFKA-6412. --- Resolution: Fixed Fix Version/s: 1.1.0 Issue resolved by pull request 4372 [https://github.com/apache/kafka/pull/4372] > Improve synchronization in CachingKeyValueStore methods > --- > > Key: KAFKA-6412 > URL: https://issues.apache.org/jira/browse/KAFKA-6412 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ted Yu > Fix For: 1.1.0 > > Attachments: 6412-jmh.v1.txt, k-6412.v1.txt > > > Currently CachingKeyValueStore methods are synchronized at method level. > It seems we can use read lock for getter and write lock for put / delete > methods. > For getInternal(), if the underlying thread is streamThread, the > getInternal() may trigger eviction. This can be handled by obtaining write > lock at the beginning of the method for streamThread. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4029) SSL support for Connect REST API
[ https://issues.apache.org/jira/browse/KAFKA-4029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319960#comment-16319960 ] Jakub Scholz commented on KAFKA-4029: - [~rhauch] Yeah, I have something already. If you want I can rebase it and create some PR so that you can have a look at it. I'm now busy with something else, but I should be able to do it till the end of the week. > SSL support for Connect REST API > > > Key: KAFKA-4029 > URL: https://issues.apache.org/jira/browse/KAFKA-4029 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Ewen Cheslack-Postava >Assignee: Jakub Scholz > > Currently the Connect REST API only supports http. We should also add SSL > support so access to the REST API can be secured. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (KAFKA-5624) Unsafe use of expired sensors
[ https://issues.apache.org/jira/browse/KAFKA-5624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar reassigned KAFKA-5624: Assignee: Manikumar > Unsafe use of expired sensors > - > > Key: KAFKA-5624 > URL: https://issues.apache.org/jira/browse/KAFKA-5624 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Manikumar > > Seems a couple unhandled cases following sensor expiration: > 1. Static sensors (such as {{ClientQuotaManager.delayQueueSensor}}) can be > expired due to inactivity, but the references will remain valid and usable. > Probably a good idea to either ensure we use a "get or create" pattern when > accessing the sensor or add a new static registration option which makes the > sensor ineligible for expiration. > 2. It is possible to register metrics through the sensor even after it is > expired. We should probably raise an exception instead. -- This message was sent by Atlassian JIRA (v6.4.14#64029)