[jira] [Commented] (KAFKA-8326) Add List Serde
[ https://issues.apache.org/jira/browse/KAFKA-8326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16974886#comment-16974886 ] Sophie Blee-Goldman commented on KAFKA-8326: Oh sorry, I thought the code part had already been merged for some reason. Nevermind! > Add List Serde > - > > Key: KAFKA-8326 > URL: https://issues.apache.org/jira/browse/KAFKA-8326 > Project: Kafka > Issue Type: Improvement > Components: clients, streams >Reporter: Daniyar Yeralin >Assignee: Daniyar Yeralin >Priority: Minor > Labels: kip > > _This ticket proposes adding new {color:#4c9aff}ListSerializer{color} and > {color:#4c9aff}ListDeserializer{color} classes as well as support for the new > classes into the Serdes class. This will allow using List Serde of type_ > {color:#4c9aff}_, T>_{color} _directly from Consumers, > Producers and Streams._ > _{color:#4c9aff}List{color} serialization and deserialization will be done > through repeatedly calling a serializer/deserializer for each entry provided > by passed generic {color:#4c9aff}T{color}'s Serde. For example, if you want > to create List of Strings serde, then serializer/deserializer of StringSerde > will be used to serialize/deserialize each entry in > {color:#4c9aff}List{color}._ > I believe there are many use cases where List Serde could be used. Ex. > [https://stackoverflow.com/questions/41427174/aggregate-java-objects-in-a-list-with-kafka-streams-dsl-windows], > > [https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api] > For instance, aggregate grouped (by key) values together in a list to do > other subsequent operations on the collection. > KIP Link: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-466%3A+Add+support+for+List%3CT%3E+serialization+and+deserialization] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9146) Add option to force delete members in stream reset tool
[ https://issues.apache.org/jira/browse/KAFKA-9146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16974873#comment-16974873 ] Matthias J. Sax commented on KAFKA-9146: Even with a public API change, it might be suitable to get started. Let's see what [~bchen225242] thinks. Of course, if you want to pick up something else, that's also fine. > Add option to force delete members in stream reset tool > --- > > Key: KAFKA-9146 > URL: https://issues.apache.org/jira/browse/KAFKA-9146 > Project: Kafka > Issue Type: Improvement > Components: consumer, streams >Reporter: Boyang Chen >Assignee: feyman >Priority: Major > Labels: newbie > > Sometimes people want to reset the stream application sooner, but blocked by > the left-over members inside group coordinator, which only expire after > session timeout. When user configures a really long session timeout, it could > prevent the group from clearing. We should consider adding the support to > cleanup members by forcing them to leave the group. To do that, > # If the stream application is already on static membership, we could call > directly from adminClient.removeMembersFromGroup > # If the application is on dynamic membership, we should modify > adminClient.removeMembersFromGroup interface to allow deletion based on > member.id. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9194) Missing documentation for replicaMaxWaitTimeMs config value
Tomasz Szlek created KAFKA-9194: --- Summary: Missing documentation for replicaMaxWaitTimeMs config value Key: KAFKA-9194 URL: https://issues.apache.org/jira/browse/KAFKA-9194 Project: Kafka Issue Type: Improvement Components: documentation Affects Versions: 2.3.0 Reporter: Tomasz Szlek I have read documentation and was interested in *replica.fetch.min.bytes* property. In description of this config name there is information about related config *replicaMaxWaitTimeMs* however there is no documentation about this related config at all. Can you add it to the [configuration page|[https://kafka.apache.org/documentation/#newconsumerconfigs]] ? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9146) Add option to force delete members in stream reset tool
[ https://issues.apache.org/jira/browse/KAFKA-9146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16974856#comment-16974856 ] feyman commented on KAFKA-9146: --- Thanks , [~mjsax] ! If it do imply a public API change and have a broader impact which is more suitable to a veteran, I could start with some other task. :) > Add option to force delete members in stream reset tool > --- > > Key: KAFKA-9146 > URL: https://issues.apache.org/jira/browse/KAFKA-9146 > Project: Kafka > Issue Type: Improvement > Components: consumer, streams >Reporter: Boyang Chen >Assignee: feyman >Priority: Major > Labels: newbie > > Sometimes people want to reset the stream application sooner, but blocked by > the left-over members inside group coordinator, which only expire after > session timeout. When user configures a really long session timeout, it could > prevent the group from clearing. We should consider adding the support to > cleanup members by forcing them to leave the group. To do that, > # If the stream application is already on static membership, we could call > directly from adminClient.removeMembersFromGroup > # If the application is on dynamic membership, we should modify > adminClient.removeMembersFromGroup interface to allow deletion based on > member.id. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8326) Add List Serde
[ https://issues.apache.org/jira/browse/KAFKA-8326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16974850#comment-16974850 ] Matthias J. Sax commented on KAFKA-8326: [~ableegoldman] I raised this already on the PR, that should include all tests and all corresponding doc updates. > Add List Serde > - > > Key: KAFKA-8326 > URL: https://issues.apache.org/jira/browse/KAFKA-8326 > Project: Kafka > Issue Type: Improvement > Components: clients, streams >Reporter: Daniyar Yeralin >Assignee: Daniyar Yeralin >Priority: Minor > Labels: kip > > _This ticket proposes adding new {color:#4c9aff}ListSerializer{color} and > {color:#4c9aff}ListDeserializer{color} classes as well as support for the new > classes into the Serdes class. This will allow using List Serde of type_ > {color:#4c9aff}_, T>_{color} _directly from Consumers, > Producers and Streams._ > _{color:#4c9aff}List{color} serialization and deserialization will be done > through repeatedly calling a serializer/deserializer for each entry provided > by passed generic {color:#4c9aff}T{color}'s Serde. For example, if you want > to create List of Strings serde, then serializer/deserializer of StringSerde > will be used to serialize/deserialize each entry in > {color:#4c9aff}List{color}._ > I believe there are many use cases where List Serde could be used. Ex. > [https://stackoverflow.com/questions/41427174/aggregate-java-objects-in-a-list-with-kafka-streams-dsl-windows], > > [https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api] > For instance, aggregate grouped (by key) values together in a list to do > other subsequent operations on the collection. > KIP Link: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-466%3A+Add+support+for+List%3CT%3E+serialization+and+deserialization] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9169) Standby Tasks point ask for incorrect offsets on resuming post suspension
[ https://issues.apache.org/jira/browse/KAFKA-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16974852#comment-16974852 ] Matthias J. Sax commented on KAFKA-9169: Yeah. If we confirm for older version, we can just update the ticket. > Standby Tasks point ask for incorrect offsets on resuming post suspension > - > > Key: KAFKA-9169 > URL: https://issues.apache.org/jira/browse/KAFKA-9169 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.0 >Reporter: Navinder Brar >Assignee: John Roesler >Priority: Critical > Fix For: 2.5.0 > > > In versions(check 2.0) where standby tasks are suspended on each rebalance > the checkpoint file is updated post the flush and the expected behaviour is > that post assignment the same standby task gets assigned back on the machine > it will start reading data from changelog from the same offset from it left > off. > > But there looks like a bug in the code, every time post rebalance it starts > reading from the offset from where it read the first time the task was > assigned on this machine. This has 2 repercussions: > # After every rebalance the standby tasks start restoring huge amount of > data which they have already restored earlier(Verified this via 300x increase > Network IO on all streams instances post rebalance even when no change in > assignment) . > # If changelog has time retention those offsets will not be available in the > changelog, which leads to offsetOutOfRange exceptions and the stores get > deleted and recreated again. > > I have gone through the code and I think I know the issue. > In TaskManager# updateNewAndRestoringTasks(), the function > assignStandbyPartitions() gets called for all the running standby tasks where > it populates the Map: checkpointedOffsets from the > standbyTask.checkpointedOffsets() which is only updated at the time of > initialization of a StandbyTask(i.e. in it's constructor). > > This has an easy fix. > Post resumption we are reading standbyTask.checkpointedOffsets() to know the > offset from where the standby task should start running and not from > stateMgr.checkpointed() which gets updated on every commit to the checkpoint > file. In the former case it's always reading from the same offset, even those > which it had already read earlier and in cases where changelog topic has a > retention time, it gives offsetOutOfRange exception. So, > standbyTask.checkpointedOffsets() is quite useless and we should use > stateMgr.checkpointed() instead to return offsets to task manager. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9146) Add option to force delete members in stream reset tool
[ https://issues.apache.org/jira/browse/KAFKA-9146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16974847#comment-16974847 ] Matthias J. Sax commented on KAFKA-9146: Thanks for your interest in Kafka! Added you to the list of contributors and assigned the ticket to you. You can now also self-assign tickets. I actually think that this ticket requires a KIP, as it implies a public API change? \cc [~bchen225242] WDYT? > Add option to force delete members in stream reset tool > --- > > Key: KAFKA-9146 > URL: https://issues.apache.org/jira/browse/KAFKA-9146 > Project: Kafka > Issue Type: Improvement > Components: consumer, streams >Reporter: Boyang Chen >Assignee: feyman >Priority: Major > Labels: newbie > > Sometimes people want to reset the stream application sooner, but blocked by > the left-over members inside group coordinator, which only expire after > session timeout. When user configures a really long session timeout, it could > prevent the group from clearing. We should consider adding the support to > cleanup members by forcing them to leave the group. To do that, > # If the stream application is already on static membership, we could call > directly from adminClient.removeMembersFromGroup > # If the application is on dynamic membership, we should modify > adminClient.removeMembersFromGroup interface to allow deletion based on > member.id. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9146) Add option to force delete members in stream reset tool
[ https://issues.apache.org/jira/browse/KAFKA-9146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-9146: -- Assignee: feyman > Add option to force delete members in stream reset tool > --- > > Key: KAFKA-9146 > URL: https://issues.apache.org/jira/browse/KAFKA-9146 > Project: Kafka > Issue Type: Improvement > Components: consumer, streams >Reporter: Boyang Chen >Assignee: feyman >Priority: Major > Labels: newbie > > Sometimes people want to reset the stream application sooner, but blocked by > the left-over members inside group coordinator, which only expire after > session timeout. When user configures a really long session timeout, it could > prevent the group from clearing. We should consider adding the support to > cleanup members by forcing them to leave the group. To do that, > # If the stream application is already on static membership, we could call > directly from adminClient.removeMembersFromGroup > # If the application is on dynamic membership, we should modify > adminClient.removeMembersFromGroup interface to allow deletion based on > member.id. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9190) Server leaves connections with expired authentication sessions open
[ https://issues.apache.org/jira/browse/KAFKA-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16974806#comment-16974806 ] Ismael Juma commented on KAFKA-9190: Nice catch. cc [~rndgstn] > Server leaves connections with expired authentication sessions open > > > Key: KAFKA-9190 > URL: https://issues.apache.org/jira/browse/KAFKA-9190 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > SocketServer implements some logic to disconnect connections which have > expired authentication sessions. At the moment, we just call > `SelectionKey.cancel` in order to trigger this disconnect. I think the > expectation is that this causes the channel to be closed on the next `poll`, > but as far as I can tell, all it does is disassociate the selection key from > the selector. This means that the key never gets selected again and we never > close the connection until the client times out. > This was found when debugging the flaky test failure > `EndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl`. > I modified the code to call `Selector.close` instead of > `TransportLayer.disconnect`. I was able to reproduce the session > authentication expiration, but the connection properly closes and the test > does no longer times out. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9146) Add option to force delete members in stream reset tool
[ https://issues.apache.org/jira/browse/KAFKA-9146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16974762#comment-16974762 ] feyman commented on KAFKA-9146: --- Hi, I would like to contribute to kafka and take this task as the start point, wondering if you could add me to the contributor list ? Thanks ! > Add option to force delete members in stream reset tool > --- > > Key: KAFKA-9146 > URL: https://issues.apache.org/jira/browse/KAFKA-9146 > Project: Kafka > Issue Type: Improvement > Components: consumer, streams >Reporter: Boyang Chen >Priority: Major > Labels: newbie > > Sometimes people want to reset the stream application sooner, but blocked by > the left-over members inside group coordinator, which only expire after > session timeout. When user configures a really long session timeout, it could > prevent the group from clearing. We should consider adding the support to > cleanup members by forcing them to leave the group. To do that, > # If the stream application is already on static membership, we could call > directly from adminClient.removeMembersFromGroup > # If the application is on dynamic membership, we should modify > adminClient.removeMembersFromGroup interface to allow deletion based on > member.id. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-3096) Leader is not set to -1 when it is shutdown if followers are down
[ https://issues.apache.org/jira/browse/KAFKA-3096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma resolved KAFKA-3096. Resolution: Fixed > Leader is not set to -1 when it is shutdown if followers are down > - > > Key: KAFKA-3096 > URL: https://issues.apache.org/jira/browse/KAFKA-3096 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.0 >Reporter: Ismael Juma >Assignee: Ismael Juma >Priority: Major > Labels: reliability > > Assuming a cluster with 2 brokers with unclear leader election disabled: > 1. Start brokers 0 and 1 > 2. Perform partition assignment > 3. Broker 0 is elected leader > 4. Produce message and wait until metadata is propagated > 6. Shutdown follower > 7. Produce message > 8. Shutdown leader > 9. Start follower > 10. Wait for leader election > Expected: leader is -1 > Actual: leader is 0 > We have a test for this, but a bug in `waitUntilLeaderIsElectedOrChanged` > means that `newLeaderOpt` is not being checked. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8488) FetchSessionHandler logging create 73 mb allocation in TLAB which could be no op
[ https://issues.apache.org/jira/browse/KAFKA-8488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16974741#comment-16974741 ] ASF GitHub Bot commented on KAFKA-8488: --- wenhoujx commented on pull request #6965: KAFKA-8488 avoid String.format(), it's gabage. URL: https://github.com/apache/kafka/pull/6965 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > FetchSessionHandler logging create 73 mb allocation in TLAB which could be no > op > - > > Key: KAFKA-8488 > URL: https://issues.apache.org/jira/browse/KAFKA-8488 > Project: Kafka > Issue Type: Improvement >Reporter: Wenshuai Hou >Priority: Minor > Fix For: 2.4.0 > > Attachments: image-2019-06-05-14-04-35-668.png > > > !image-2019-06-05-14-04-35-668.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9193) org.apache.kafka.common.utils.Timer should use monotonic clock
Lucas Bradstreet created KAFKA-9193: --- Summary: org.apache.kafka.common.utils.Timer should use monotonic clock Key: KAFKA-9193 URL: https://issues.apache.org/jira/browse/KAFKA-9193 Project: Kafka Issue Type: Bug Reporter: Lucas Bradstreet utils.Timer uses System.currentTimeMillis to implement blocking methods with timeouts. We should not rely on a non-monotonic clock and should instead switch this to Time.hiResClockMs() (which uses System.nanoTime). When we do so we should revert [https://github.com/apache/kafka/pull/7683] which was caused by inaccuracies in our current approach (the test was good, the code is bad). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9191) Kafka throughput suffers substantially when scaling topic partitions with small messages
[ https://issues.apache.org/jira/browse/KAFKA-9191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16974615#comment-16974615 ] Chris Pettitt commented on KAFKA-9191: -- We have not teased that apart. We do have the broker metrics available to look at (and they can be regenerated with one or both of the in-house perf tools). On the broker side we observed heavier CPU utilization with 100 byte messages (capping at 100%), while utilization was lower (50-75%) for 512 byte messages. > Kafka throughput suffers substantially when scaling topic partitions with > small messages > > > Key: KAFKA-9191 > URL: https://issues.apache.org/jira/browse/KAFKA-9191 > Project: Kafka > Issue Type: Bug >Reporter: Chris Pettitt >Priority: Major > > We have observed, using two entirely different tools, that a simple Kafka > application (read 1 topic and immediately produce to another) suffers > substantial throughput degradation when scaling up topics. Below is the > output of one of these tools, showing that going from 1 partition to 1000 > partitions yields a ~30% throughput decrease when messages are 100 bytes long. > Using the same two tools, we observed that increasing the message size to 512 > bytes yields a throughput increase of ~20% going from 1 topic partition to > 1000 topic partitions with all other variables held constant. > > |Kafka Core Testing| | | | | | | | | | | > |Enable Transaction|Batch Size (b)|Linger (ms)|Max Inflight|Commit Interval > (ms)|Num Records|Record Size (b)|Num Input Topics|1 Partition MB/s|1000 > Partitions MB/s|MB/s delta| > |FALSE|16384|100|5|1000|2000|100|1|45.633625|31.482193|-31.01%| > |FALSE|16384|100|5|1000|2000|512|1|70.217902|85.319107|21.51%| -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9011) Add KStream#flatTransform and KStream#flatTransformValues to Scala API
[ https://issues.apache.org/jira/browse/KAFKA-9011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16974604#comment-16974604 ] ASF GitHub Bot commented on KAFKA-9011: --- bbejeck commented on pull request #7685: KAFKA-9011: Removed multiple calls to supplier.get() URL: https://github.com/apache/kafka/pull/7685 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add KStream#flatTransform and KStream#flatTransformValues to Scala API > -- > > Key: KAFKA-9011 > URL: https://issues.apache.org/jira/browse/KAFKA-9011 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.3.0 >Reporter: Alex Kokachev >Assignee: Alex Kokachev >Priority: Major > Labels: scala, streams > Fix For: 2.5.0 > > > Part of KIP-313: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-313%3A+Add+KStream.flatTransform+and+KStream.flatTransformValues] > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9192) NullPointerException if field in schema not present in value
Mark Tinsley created KAFKA-9192: --- Summary: NullPointerException if field in schema not present in value Key: KAFKA-9192 URL: https://issues.apache.org/jira/browse/KAFKA-9192 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 2.2.1 Reporter: Mark Tinsley Given a message: {code:java} { "schema":{ "type":"struct", "fields":[ { "type":"string", "optional":true, "field":"abc" } ], "optional":false, "name":"foobar" }, "payload":{ } } {code} I would expect, given the field is optional, for the JsonConverter to still process this value. What happens is I get a null pointer exception, the stacktrace points to this line: https://github.com/apache/kafka/blob/2.1/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L701 called by https://github.com/apache/kafka/blob/2.1/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L181 Issue seems to be that we need to check and see if the jsonValue is null before checking if the jsonValue has a null value. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9191) Kafka throughput suffers substantially when scaling topic partitions with small messages
[ https://issues.apache.org/jira/browse/KAFKA-9191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16974585#comment-16974585 ] Ismael Juma commented on KAFKA-9191: Thanks for the report. Do we know if the bottleneck was the client or broker? > Kafka throughput suffers substantially when scaling topic partitions with > small messages > > > Key: KAFKA-9191 > URL: https://issues.apache.org/jira/browse/KAFKA-9191 > Project: Kafka > Issue Type: Bug >Reporter: Chris Pettitt >Priority: Major > > We have observed, using two entirely different tools, that a simple Kafka > application (read 1 topic and immediately produce to another) suffers > substantial throughput degradation when scaling up topics. Below is the > output of one of these tools, showing that going from 1 partition to 1000 > partitions yields a ~30% throughput decrease when messages are 100 bytes long. > Using the same two tools, we observed that increasing the message size to 512 > bytes yields a throughput increase of ~20% going from 1 topic partition to > 1000 topic partitions with all other variables held constant. > > |Kafka Core Testing| | | | | | | | | | | > |Enable Transaction|Batch Size (b)|Linger (ms)|Max Inflight|Commit Interval > (ms)|Num Records|Record Size (b)|Num Input Topics|1 Partition MB/s|1000 > Partitions MB/s|MB/s delta| > |FALSE|16384|100|5|1000|2000|100|1|45.633625|31.482193|-31.01%| > |FALSE|16384|100|5|1000|2000|512|1|70.217902|85.319107|21.51%| -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9191) Kafka throughput suffers substantially when scaling topic partitions with small messages
[ https://issues.apache.org/jira/browse/KAFKA-9191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Pettitt updated KAFKA-9191: - Description: We have observed, using two entirely different tools, that a simple Kafka application (read 1 topic and immediately produce to another) suffers substantial throughput degradation when scaling up topics. Below is the output of one of these tools, showing that going from 1 partition to 1000 partitions yields a ~30% throughput decrease when messages are 100 bytes long. Using the same two tools, we observed that increasing the message size to 512 bytes yields a throughput increase of ~20% going from 1 topic partition to 1000 topic partitions with all other variables held constant. |Kafka Core Testing| | | | | | | | | | | |Enable Transaction|Batch Size (b)|Linger (ms)|Max Inflight|Commit Interval (ms)|Num Records|Record Size (b)|Num Input Topics|1 Partition MB/s|1000 Partitions MB/s|MB/s delta| |FALSE|16384|100|5|1000|2000|100|1|45.633625|31.482193|-31.01%| |FALSE|16384|100|5|1000|2000|512|1|70.217902|85.319107|21.51%| was: We have observed, using two entirely different tools, that a simple Kafka application (read 1 topic and immediately produce to another) suffers substantial performance degradation when scaling up topics. Below is the output of one of these tools, showing that going from 1 partition to 1000 partitions yields a ~30% throughput decrease when messages are 100 bytes long. Using the same two tools, we observed that increasing the message size to 512 bytes yields a throughput increase of ~20% going from 1 topic partition to 1000 topic partitions with all other variables held constant. |Kafka Core Testing| | | | | | | | | | | |Enable Transaction|Batch Size (b)|Linger (ms)|Max Inflight|Commit Interval (ms)|Num Records|Record Size (b)|Num Input Topics|1 Partition MB/s|1000 Partitions MB/s|MB/s delta| |FALSE|16384|100|5|1000|2000|100|1|45.633625|31.482193|-31.01%| |FALSE|16384|100|5|1000|2000|512|1|70.217902|85.319107|21.51%| > Kafka throughput suffers substantially when scaling topic partitions with > small messages > > > Key: KAFKA-9191 > URL: https://issues.apache.org/jira/browse/KAFKA-9191 > Project: Kafka > Issue Type: Bug >Reporter: Chris Pettitt >Priority: Major > > We have observed, using two entirely different tools, that a simple Kafka > application (read 1 topic and immediately produce to another) suffers > substantial throughput degradation when scaling up topics. Below is the > output of one of these tools, showing that going from 1 partition to 1000 > partitions yields a ~30% throughput decrease when messages are 100 bytes long. > Using the same two tools, we observed that increasing the message size to 512 > bytes yields a throughput increase of ~20% going from 1 topic partition to > 1000 topic partitions with all other variables held constant. > > |Kafka Core Testing| | | | | | | | | | | > |Enable Transaction|Batch Size (b)|Linger (ms)|Max Inflight|Commit Interval > (ms)|Num Records|Record Size (b)|Num Input Topics|1 Partition MB/s|1000 > Partitions MB/s|MB/s delta| > |FALSE|16384|100|5|1000|2000|100|1|45.633625|31.482193|-31.01%| > |FALSE|16384|100|5|1000|2000|512|1|70.217902|85.319107|21.51%| -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9191) Kafka throughput suffers substantially when scaling topic partitions with small messages
[ https://issues.apache.org/jira/browse/KAFKA-9191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Pettitt updated KAFKA-9191: - Description: We have observed, using two entirely different tools, that a simple Kafka application (read 1 topic and immediately produce to another) suffers substantial performance degradation when scaling up topics. Below is the output of one of these tools, showing that going from 1 partition to 1000 partitions yields a ~30% throughput decrease when messages are 100 bytes long. Using the same two tools, we observed that increasing the message size to 512 bytes yields a throughput increase of ~20% going from 1 topic partition to 1000 topic partitions with all other variables held constant. |Kafka Core Testing| | | | | | | | | | | |Enable Transaction|Batch Size (b)|Linger (ms)|Max Inflight|Commit Interval (ms)|Num Records|Record Size (b)|Num Input Topics|1 Partition MB/s|1000 Partitions MB/s|MB/s delta| |FALSE|16384|100|5|1000|2000|100|1|45.633625|31.482193|-31.01%| |FALSE|16384|100|5|1000|2000|512|1|70.217902|85.319107|21.51%| was: We have observed, using two entirely different tools, that a simple Kafka application (read 1 topic and immediately produce to another) suffers substantial performance degradation when scaling up topics. Below is the output of one of these tools, showing that going from 1 partition to 1000 partitions yields a ~30% throughput decrease when messages are 100 bytes long. Using the same two tools, we observed that increasing the message size to 512 bytes yields a throughput increase of ~20% going from 1 topic partition to 1000 topic partitions with all other variables held constant. |Kafka Core Testing| | | | | | | | | | | |Enable Transaction|Batch Size (b)|Linger (ms)|Max Inflight|Commit Interval (ms)|Num Records|Record Size (b)|# Input Topics|1 Partition MB/s|1000 Partitions MB/s|MB/s delta| |FALSE|16384|100|5|1000|2000|100|1|45.633625|31.482193|-31.01%| |FALSE|16384|100|5|1000|2000|512|1|70.217902|85.319107|21.51%| > Kafka throughput suffers substantially when scaling topic partitions with > small messages > > > Key: KAFKA-9191 > URL: https://issues.apache.org/jira/browse/KAFKA-9191 > Project: Kafka > Issue Type: Bug >Reporter: Chris Pettitt >Priority: Major > > We have observed, using two entirely different tools, that a simple Kafka > application (read 1 topic and immediately produce to another) suffers > substantial performance degradation when scaling up topics. Below is the > output of one of these tools, showing that going from 1 partition to 1000 > partitions yields a ~30% throughput decrease when messages are 100 bytes long. > Using the same two tools, we observed that increasing the message size to 512 > bytes yields a throughput increase of ~20% going from 1 topic partition to > 1000 topic partitions with all other variables held constant. > > |Kafka Core Testing| | | | | | | | | | | > |Enable Transaction|Batch Size (b)|Linger (ms)|Max Inflight|Commit Interval > (ms)|Num Records|Record Size (b)|Num Input Topics|1 Partition MB/s|1000 > Partitions MB/s|MB/s delta| > |FALSE|16384|100|5|1000|2000|100|1|45.633625|31.482193|-31.01%| > |FALSE|16384|100|5|1000|2000|512|1|70.217902|85.319107|21.51%| -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9191) Kafka throughput suffers substantially when scaling topic partitions with small messages
Chris Pettitt created KAFKA-9191: Summary: Kafka throughput suffers substantially when scaling topic partitions with small messages Key: KAFKA-9191 URL: https://issues.apache.org/jira/browse/KAFKA-9191 Project: Kafka Issue Type: Bug Reporter: Chris Pettitt We have observed, using two entirely different tools, that a simple Kafka application (read 1 topic and immediately produce to another) suffers substantial performance degradation when scaling up topics. Below is the output of one of these tools, showing that going from 1 partition to 1000 partitions yields a ~30% throughput decrease when messages are 100 bytes long. Using the same two tools, we observed that increasing the message size to 512 bytes yields a throughput increase of ~20% going from 1 topic partition to 1000 topic partitions with all other variables held constant. |Kafka Core Testing| | | | | | | | | | | |Enable Transaction|Batch Size (b)|Linger (ms)|Max Inflight|Commit Interval (ms)|Num Records|Record Size (b)|# Input Topics|1 Partition MB/s|1000 Partitions MB/s|MB/s delta| |FALSE|16384|100|5|1000|2000|100|1|45.633625|31.482193|-31.01%| |FALSE|16384|100|5|1000|2000|512|1|70.217902|85.319107|21.51%| -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9044) Brokers occasionally (randomly?) dropping out of clusters
[ https://issues.apache.org/jira/browse/KAFKA-9044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16974576#comment-16974576 ] Peter Bukowinski commented on KAFKA-9044: - Thanks for your help, as well. You were certainly correct in your hunch that something external to kafka was causing it. > Brokers occasionally (randomly?) dropping out of clusters > - > > Key: KAFKA-9044 > URL: https://issues.apache.org/jira/browse/KAFKA-9044 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.3.0, 2.3.1 > Environment: Ubuntu 14.04 >Reporter: Peter Bukowinski >Priority: Major > > I have several cluster running kafka 2.3.1 and 2.3.0 and this issue has > affected all of them. Because of replication and the size of the clusters (30 > brokers), this bug is not causing any data loss, but it is nevertheless > concerning. When a broker drops out, the log gives no indication that there > are any zookeeper issues (and indeed the zookeepers are healthy when this > occurs. Here's snippet from a broker log when it occurs: > {{[2019-10-07 11:02:27,630] INFO [GroupMetadataManager brokerId=14] Removed 0 > expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 11:02:56,936] INFO [Log partition=internal_test-52, > dir=/data/3/kl] Found deletable segments with base offsets [1975332] due to > retention time 360ms breach (kafka.log.Log)}} > {{[2019-10-07 11:02:56,936] INFO [Log partition=internal_test-52, > dir=/data/3/kl] Scheduling log segment [baseOffset 1975332, size 92076008] > for deletion. (kafka.log.Log)}} > {{[2019-10-07 11:02:56,936] INFO [Log partition=internal_test-52, > dir=/data/3/kl] Incrementing log start offset to 2000317 (kafka.log.Log)}} > {{[2019-10-07 11:03:56,936] INFO [Log partition=internal_test-52, > dir=/data/3/kl] Deleting segment 1975332 (kafka.log.Log)}} > {{[2019-10-07 11:03:56,957] INFO Deleted log > /data/3/kl/internal_test-52/01975332.log.deleted. > (kafka.log.LogSegment)}} > {{[2019-10-07 11:03:56,957] INFO Deleted offset index > /data/3/kl/internal_test-52/01975332.index.deleted. > (kafka.log.LogSegment)}} > {{[2019-10-07 11:03:56,958] INFO Deleted time index > /data/3/kl/internal_test-52/01975332.timeindex.deleted. > (kafka.log.LogSegment)}} > {{[2019-10-07 11:12:27,630] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 11:22:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 11:32:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 11:42:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 11:52:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 12:02:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 12:12:27,630] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 12:22:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 12:32:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 12:42:27,630] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 1 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 12:52:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 13:02:27,630] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 13:12:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 13:22:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 13:32:27,629] INFO [Group
[jira] [Commented] (KAFKA-8326) Add List Serde
[ https://issues.apache.org/jira/browse/KAFKA-8326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16974570#comment-16974570 ] Sophie Blee-Goldman commented on KAFKA-8326: Did we document this addition to the Serdes anywhere? We probably want to point out the new feature somewhere, and should definitely add any new configs that were introduced to the configs docs > Add List Serde > - > > Key: KAFKA-8326 > URL: https://issues.apache.org/jira/browse/KAFKA-8326 > Project: Kafka > Issue Type: Improvement > Components: clients, streams >Reporter: Daniyar Yeralin >Assignee: Daniyar Yeralin >Priority: Minor > Labels: kip > > _This ticket proposes adding new {color:#4c9aff}ListSerializer{color} and > {color:#4c9aff}ListDeserializer{color} classes as well as support for the new > classes into the Serdes class. This will allow using List Serde of type_ > {color:#4c9aff}_, T>_{color} _directly from Consumers, > Producers and Streams._ > _{color:#4c9aff}List{color} serialization and deserialization will be done > through repeatedly calling a serializer/deserializer for each entry provided > by passed generic {color:#4c9aff}T{color}'s Serde. For example, if you want > to create List of Strings serde, then serializer/deserializer of StringSerde > will be used to serialize/deserialize each entry in > {color:#4c9aff}List{color}._ > I believe there are many use cases where List Serde could be used. Ex. > [https://stackoverflow.com/questions/41427174/aggregate-java-objects-in-a-list-with-kafka-streams-dsl-windows], > > [https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api] > For instance, aggregate grouped (by key) values together in a list to do > other subsequent operations on the collection. > KIP Link: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-466%3A+Add+support+for+List%3CT%3E+serialization+and+deserialization] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9044) Brokers occasionally (randomly?) dropping out of clusters
[ https://issues.apache.org/jira/browse/KAFKA-9044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16974548#comment-16974548 ] Jun Rao commented on KAFKA-9044: [~pmbuko]: Thanks for the update. > Brokers occasionally (randomly?) dropping out of clusters > - > > Key: KAFKA-9044 > URL: https://issues.apache.org/jira/browse/KAFKA-9044 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.3.0, 2.3.1 > Environment: Ubuntu 14.04 >Reporter: Peter Bukowinski >Priority: Major > > I have several cluster running kafka 2.3.1 and 2.3.0 and this issue has > affected all of them. Because of replication and the size of the clusters (30 > brokers), this bug is not causing any data loss, but it is nevertheless > concerning. When a broker drops out, the log gives no indication that there > are any zookeeper issues (and indeed the zookeepers are healthy when this > occurs. Here's snippet from a broker log when it occurs: > {{[2019-10-07 11:02:27,630] INFO [GroupMetadataManager brokerId=14] Removed 0 > expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 11:02:56,936] INFO [Log partition=internal_test-52, > dir=/data/3/kl] Found deletable segments with base offsets [1975332] due to > retention time 360ms breach (kafka.log.Log)}} > {{[2019-10-07 11:02:56,936] INFO [Log partition=internal_test-52, > dir=/data/3/kl] Scheduling log segment [baseOffset 1975332, size 92076008] > for deletion. (kafka.log.Log)}} > {{[2019-10-07 11:02:56,936] INFO [Log partition=internal_test-52, > dir=/data/3/kl] Incrementing log start offset to 2000317 (kafka.log.Log)}} > {{[2019-10-07 11:03:56,936] INFO [Log partition=internal_test-52, > dir=/data/3/kl] Deleting segment 1975332 (kafka.log.Log)}} > {{[2019-10-07 11:03:56,957] INFO Deleted log > /data/3/kl/internal_test-52/01975332.log.deleted. > (kafka.log.LogSegment)}} > {{[2019-10-07 11:03:56,957] INFO Deleted offset index > /data/3/kl/internal_test-52/01975332.index.deleted. > (kafka.log.LogSegment)}} > {{[2019-10-07 11:03:56,958] INFO Deleted time index > /data/3/kl/internal_test-52/01975332.timeindex.deleted. > (kafka.log.LogSegment)}} > {{[2019-10-07 11:12:27,630] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 11:22:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 11:32:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 11:42:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 11:52:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 12:02:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 12:12:27,630] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 12:22:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 12:32:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 12:42:27,630] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 1 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 12:52:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 13:02:27,630] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 13:12:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 13:22:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 13:32:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.G
[jira] [Updated] (KAFKA-8874) KIP-517: Add consumer metrics to observe user poll behavior
[ https://issues.apache.org/jira/browse/KAFKA-8874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kevin Lu updated KAFKA-8874: Description: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-517%3A+Add+consumer+metrics+to+observe+user+poll+behavior] It would be beneficial to add a metric to record the average/max time between calls to poll as it can be used by both Kafka application owners and operators to: * Easily identify if/when max.poll.interval.ms needs to be changed (and to what value) * View trends/patterns * Verify max.poll.interval.ms was hit using the max metric when debugging consumption issues (if logs are not available) * Configure alerts to notify when average/max time is too close to max.poll.interval.ms was: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-517%3A+Add+consumer+metric+indicating+time+between+poll+calls] It would be beneficial to add a metric to record the average/max time between calls to poll as it can be used by both Kafka application owners and operators to: * Easily identify if/when max.poll.interval.ms needs to be changed (and to what value) * View trends/patterns * Verify max.poll.interval.ms was hit using the max metric when debugging consumption issues (if logs are not available) * Configure alerts to notify when average/max time is too close to max.poll.interval.ms > KIP-517: Add consumer metrics to observe user poll behavior > --- > > Key: KAFKA-8874 > URL: https://issues.apache.org/jira/browse/KAFKA-8874 > Project: Kafka > Issue Type: New Feature > Components: consumer, metrics >Reporter: Kevin Lu >Assignee: Kevin Lu >Priority: Minor > Fix For: 2.4.0 > > > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-517%3A+Add+consumer+metrics+to+observe+user+poll+behavior] > It would be beneficial to add a metric to record the average/max time between > calls to poll as it can be used by both Kafka application owners and > operators to: > * Easily identify if/when max.poll.interval.ms needs to be changed (and to > what value) > * View trends/patterns > * Verify max.poll.interval.ms was hit using the max metric when debugging > consumption issues (if logs are not available) > * Configure alerts to notify when average/max time is too close to > max.poll.interval.ms -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-8874) KIP-517: Add consumer metrics to observe user poll behavior
[ https://issues.apache.org/jira/browse/KAFKA-8874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kevin Lu updated KAFKA-8874: Summary: KIP-517: Add consumer metrics to observe user poll behavior (was: KIP-517: Add consumer metric indicating time between poll calls) > KIP-517: Add consumer metrics to observe user poll behavior > --- > > Key: KAFKA-8874 > URL: https://issues.apache.org/jira/browse/KAFKA-8874 > Project: Kafka > Issue Type: New Feature > Components: consumer, metrics >Reporter: Kevin Lu >Assignee: Kevin Lu >Priority: Minor > Fix For: 2.4.0 > > > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-517%3A+Add+consumer+metric+indicating+time+between+poll+calls] > It would be beneficial to add a metric to record the average/max time between > calls to poll as it can be used by both Kafka application owners and > operators to: > * Easily identify if/when max.poll.interval.ms needs to be changed (and to > what value) > * View trends/patterns > * Verify max.poll.interval.ms was hit using the max metric when debugging > consumption issues (if logs are not available) > * Configure alerts to notify when average/max time is too close to > max.poll.interval.ms -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9178) restoredPartitions is not cleared until the last restoring task completes
[ https://issues.apache.org/jira/browse/KAFKA-9178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar updated KAFKA-9178: - Fix Version/s: 2.4.0 > restoredPartitions is not cleared until the last restoring task completes > - > > Key: KAFKA-9178 > URL: https://issues.apache.org/jira/browse/KAFKA-9178 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.4.0 >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Blocker > Labels: streams > Fix For: 2.4.0 > > > We check the `active` set is empty during closeLostTasks(). However we don't > currently properly clear the {{restoredPartitions}} set in some edge cases: > We only remove partitions from {{restoredPartitions}} when a) all tasks are > done restoring, at which point we clear it entirely(in > {{AssignedStreamTasks#updateRestored}}), or b) one task at a time, when that > task is restoring and is closed. > Say some partitions were still restoring while others had completed and > transitioned to running when a rebalance occurs. The still-restoring tasks > are all revoked, and closed immediately, and their partitions removed from > {{restoredPartitions}}. We also suspend & revoke some running tasks that have > finished restoring, and remove them from {{running}}/{{runningByPartition}}. > Now we have only running tasks left, so in > {{TaskManager#updateNewAndRestoringTasks}} we don’t ever even call > {{AssignedStreamTasks#updateRestored }}and therefore we never get to clear > {{restoredPartitions}}. We then close each of the currently running tasks and > remove their partitions from everything, BUT we never got to remove or clear > the partitions of the running tasks that we revoked previously. > It turns out we can't just rely on removing from {{restoredPartitions }}upon > completion since the partitions will just be added back to it during the next > loop (blocked by KAFKA-9177). For now, we should just remove partitions from > {{restoredPartitions}} when closing or suspending running tasks as well. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9044) Brokers occasionally (randomly?) dropping out of clusters
[ https://issues.apache.org/jira/browse/KAFKA-9044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Peter Bukowinski resolved KAFKA-9044. - Resolution: Not A Bug Issue traced to external cause. > Brokers occasionally (randomly?) dropping out of clusters > - > > Key: KAFKA-9044 > URL: https://issues.apache.org/jira/browse/KAFKA-9044 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.3.0, 2.3.1 > Environment: Ubuntu 14.04 >Reporter: Peter Bukowinski >Priority: Major > > I have several cluster running kafka 2.3.1 and 2.3.0 and this issue has > affected all of them. Because of replication and the size of the clusters (30 > brokers), this bug is not causing any data loss, but it is nevertheless > concerning. When a broker drops out, the log gives no indication that there > are any zookeeper issues (and indeed the zookeepers are healthy when this > occurs. Here's snippet from a broker log when it occurs: > {{[2019-10-07 11:02:27,630] INFO [GroupMetadataManager brokerId=14] Removed 0 > expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 11:02:56,936] INFO [Log partition=internal_test-52, > dir=/data/3/kl] Found deletable segments with base offsets [1975332] due to > retention time 360ms breach (kafka.log.Log)}} > {{[2019-10-07 11:02:56,936] INFO [Log partition=internal_test-52, > dir=/data/3/kl] Scheduling log segment [baseOffset 1975332, size 92076008] > for deletion. (kafka.log.Log)}} > {{[2019-10-07 11:02:56,936] INFO [Log partition=internal_test-52, > dir=/data/3/kl] Incrementing log start offset to 2000317 (kafka.log.Log)}} > {{[2019-10-07 11:03:56,936] INFO [Log partition=internal_test-52, > dir=/data/3/kl] Deleting segment 1975332 (kafka.log.Log)}} > {{[2019-10-07 11:03:56,957] INFO Deleted log > /data/3/kl/internal_test-52/01975332.log.deleted. > (kafka.log.LogSegment)}} > {{[2019-10-07 11:03:56,957] INFO Deleted offset index > /data/3/kl/internal_test-52/01975332.index.deleted. > (kafka.log.LogSegment)}} > {{[2019-10-07 11:03:56,958] INFO Deleted time index > /data/3/kl/internal_test-52/01975332.timeindex.deleted. > (kafka.log.LogSegment)}} > {{[2019-10-07 11:12:27,630] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 11:22:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 11:32:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 11:42:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 11:52:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 12:02:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 12:12:27,630] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 12:22:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 12:32:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 12:42:27,630] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 1 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 12:52:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 13:02:27,630] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 13:12:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 13:22:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 13:32:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataM
[jira] [Created] (KAFKA-9190) Server leaves connections with expired authentication sessions open
Jason Gustafson created KAFKA-9190: -- Summary: Server leaves connections with expired authentication sessions open Key: KAFKA-9190 URL: https://issues.apache.org/jira/browse/KAFKA-9190 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Assignee: Jason Gustafson SocketServer implements some logic to disconnect connections which have expired authentication sessions. At the moment, we just call `SelectionKey.cancel` in order to trigger this disconnect. I think the expectation is that this causes the channel to be closed on the next `poll`, but as far as I can tell, all it does is disassociate the selection key from the selector. This means that the key never gets selected again and we never close the connection until the client times out. This was found when debugging the flaky test failure `EndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl`. I modified the code to call `Selector.close` instead of `TransportLayer.disconnect`. I was able to reproduce the session authentication expiration, but the connection properly closes and the test does no longer times out. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9169) Standby Tasks point ask for incorrect offsets on resuming post suspension
[ https://issues.apache.org/jira/browse/KAFKA-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16974430#comment-16974430 ] John Roesler commented on KAFKA-9169: - Aha, that makes more sense... Thanks, [~mjsax] n.b., 2.3.0 is probably not truly the first affected version, but it's the first *confirmed* affected version. For this issue, we can only determine an affected version empirically, so we would have to port the system test from the PR to each old version we want to consider. Just wanted to make that clear in case someone wants to know later on if a specific older version < 2.3.0 is affected. The answer is, "unknown, but we can find out". > Standby Tasks point ask for incorrect offsets on resuming post suspension > - > > Key: KAFKA-9169 > URL: https://issues.apache.org/jira/browse/KAFKA-9169 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.0 >Reporter: Navinder Brar >Assignee: John Roesler >Priority: Critical > Fix For: 2.5.0 > > > In versions(check 2.0) where standby tasks are suspended on each rebalance > the checkpoint file is updated post the flush and the expected behaviour is > that post assignment the same standby task gets assigned back on the machine > it will start reading data from changelog from the same offset from it left > off. > > But there looks like a bug in the code, every time post rebalance it starts > reading from the offset from where it read the first time the task was > assigned on this machine. This has 2 repercussions: > # After every rebalance the standby tasks start restoring huge amount of > data which they have already restored earlier(Verified this via 300x increase > Network IO on all streams instances post rebalance even when no change in > assignment) . > # If changelog has time retention those offsets will not be available in the > changelog, which leads to offsetOutOfRange exceptions and the stores get > deleted and recreated again. > > I have gone through the code and I think I know the issue. > In TaskManager# updateNewAndRestoringTasks(), the function > assignStandbyPartitions() gets called for all the running standby tasks where > it populates the Map: checkpointedOffsets from the > standbyTask.checkpointedOffsets() which is only updated at the time of > initialization of a StandbyTask(i.e. in it's constructor). > > This has an easy fix. > Post resumption we are reading standbyTask.checkpointedOffsets() to know the > offset from where the standby task should start running and not from > stateMgr.checkpointed() which gets updated on every commit to the checkpoint > file. In the former case it's always reading from the same offset, even those > which it had already read earlier and in cases where changelog topic has a > retention time, it gives offsetOutOfRange exception. So, > standbyTask.checkpointedOffsets() is quite useless and we should use > stateMgr.checkpointed() instead to return offsets to task manager. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9189) Shutdown is blocked if connection to Zookeeper is lost
Boris Granveaud created KAFKA-9189: -- Summary: Shutdown is blocked if connection to Zookeeper is lost Key: KAFKA-9189 URL: https://issues.apache.org/jira/browse/KAFKA-9189 Project: Kafka Issue Type: Bug Components: core Affects Versions: 2.3.0 Environment: Linux, Docker 19.03.4 Reporter: Boris Granveaud We are using Kafka and Zookeeper in Docker swarm stacks. When we undeploy a stack, sometimes Kafka doesn't shutdown properly and is finally killed by Docker (thus leaving potentially corrupted files). Here are the steps to reproduce (simple Docker, no swarm): {code:java} docker network create test docker run -d --network test --name zk --rm zookeeper:3.5.6 docker run --network test --name kf --rm -e "KAFKA_ZOOKEEPER_CONNECT=zk:2181" -e "KAFKA_ADVERTISED_LISTENERS=INSIDE://:9091" -e "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT" -e "KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE" confluentinc/cp-kafka:5.3.1 {code} In another shell: {code:java} docker stop zk docker stop kf {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9188) Flaky Test SslAdminClientIntegrationTest.testSynchronousAuthorizerAclUpdatesBlockRequestThreads
Bill Bejeck created KAFKA-9188: -- Summary: Flaky Test SslAdminClientIntegrationTest.testSynchronousAuthorizerAclUpdatesBlockRequestThreads Key: KAFKA-9188 URL: https://issues.apache.org/jira/browse/KAFKA-9188 Project: Kafka Issue Type: Test Components: core Reporter: Bill Bejeck Failed in [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/9373/testReport/junit/kafka.api/SslAdminClientIntegrationTest/testSynchronousAuthorizerAclUpdatesBlockRequestThreads/] {noformat} Error Messagejava.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout.Stacktracejava.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout. at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) at kafka.api.SslAdminClientIntegrationTest.$anonfun$testSynchronousAuthorizerAclUpdatesBlockRequestThreads$1(SslAdminClientIntegrationTest.scala:201) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at kafka.api.SslAdminClientIntegrationTest.testSynchronousAuthorizerAclUpdatesBlockRequestThreads(SslAdminClientIntegrationTest.scala:201) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout. Standard Output[2019-11-14 15:13:51,489] ERROR [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Error for partition mytopic1-1 at offset 0 (kafka.server.ReplicaFetcherThread:76) org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. [2019-11-14 15:13:51,490] ERROR [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error for partition mytopic1-0 at offset 0 (kafka.server.ReplicaFetcherThread:76) org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. [2019-11-14 15:14:04,686] ERROR [KafkaApi-2] Error when handling request: clientId=adminclient-644, correlationId=4, api=CREATE_ACLS, version=1, body={creations=[{resource_type=2,resource_name=foobar,resource_pattern_type=3,principal=User:ANONYMOUS,host=*,operation=3,permission_type=3},{resource_type=5,resource_name=transactional_id,resource_pattern_type=3,principal=User:ANONYMOUS,host=*,operation=4,permission_type=3}]} (kafka.server.KafkaApis:76) org.apache.kafka.common.errors.ClusterAuthorizationException: Request Request(processor=1, connectionId=127.0.0.1:41993-127.0.0.1:34770-0, session=Session(User:ANONYMOUS,/127.0.0.1), listenerName=ListenerName(SSL), securityProtocol=SSL, buffer=null) is not authorized. [2019-11-14 15:14:04,689] ERROR [KafkaApi-2] Error when handling request: clientId=adminclient-644, correlationId=5, api=DELETE_ACLS, version=1, body={filters=[{resource_type=2,resource_name=foobar,resource_pattern_type_filter=3,principal=User:ANONYMOUS,host=*,operation=3,permission_type=3},{resource_type=5,resource_name=transactional_id,resource_pattern_type_filter=3,principal=User:ANONYMOUS,host=*,operation=4,permission_type=3}]} (kafka.server.KafkaApis:76) org.apache.kafka.
[jira] [Commented] (KAFKA-9181) Flaky test kafka.api.SaslGssapiSslEndToEndAuthorizationTest.testNoConsumeWithoutDescribeAclViaSubscribe
[ https://issues.apache.org/jira/browse/KAFKA-9181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16974343#comment-16974343 ] Bill Bejeck commented on KAFKA-9181: Failed again in [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/3358/] on 11/14/19 {noformat} Error Messageorg.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [topic2]Stacktraceorg.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [topic2] Standard OutputAdding ACLs for resource `ResourcePattern(resourceType=CLUSTER, name=kafka-cluster, patternType=LITERAL)`: (principal=User:kafka, host=*, operation=CLUSTER_ACTION, permissionType=ALLOW) Current ACLs for resource `Cluster:LITERAL:kafka-cluster`: User:kafka has Allow permission for operations: ClusterAction from hosts: * Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=*, patternType=LITERAL)`: (principal=User:kafka, host=*, operation=READ, permissionType=ALLOW) Current ACLs for resource `Topic:LITERAL:*`: User:kafka has Allow permission for operations: Read from hosts: * Debug is true storeKey true useTicketCache false useKeyTab true doNotPrompt false ticketCache is null isInitiator true KeyTab is /tmp/kafka15290895624380393501.tmp refreshKrb5Config is false principal is kafka/localh...@example.com tryFirstPass is false useFirstPass is false storePass is false clearPass is false principal is kafka/localh...@example.com Will use keytab Commit Succeeded [2019-11-14 00:58:37,287] ERROR [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error for partition __consumer_offsets-0 at offset 0 (kafka.server.ReplicaFetcherThread:76) org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. [2019-11-14 00:58:37,304] ERROR [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Error for partition __consumer_offsets-0 at offset 0 (kafka.server.ReplicaFetcherThread:76) org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. [2019-11-14 00:58:37,474] ERROR [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error for partition e2etopic-0 at offset 0 (kafka.server.ReplicaFetcherThread:76) org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. [2019-11-14 00:58:37,490] ERROR [ReplicaFetcher replicaId=2, leaderId=0, fetcherId=0] Error for partition e2etopic-0 at offset 0 (kafka.server.ReplicaFetcherThread:76) org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=e2etopic, patternType=LITERAL)`: (principal=User:client, host=*, operation=CREATE, permissionType=ALLOW) (principal=User:client, host=*, operation=WRITE, permissionType=ALLOW) (principal=User:client, host=*, operation=DESCRIBE, permissionType=ALLOW) Current ACLs for resource `Topic:LITERAL:e2etopic`: User:client has Allow permission for operations: Describe from hosts: * User:client has Allow permission for operations: Create from hosts: * User:client has Allow permission for operations: Write from hosts: * Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=e2etopic, patternType=LITERAL)`: (principal=User:client, host=*, operation=READ, permissionType=ALLOW) (principal=User:client, host=*, operation=DESCRIBE, permissionType=ALLOW) Adding ACLs for resource `ResourcePattern(resourceType=GROUP, name=group, patternType=LITERAL)`: (principal=User:client, host=*, operation=READ, permissionType=ALLOW) Current ACLs for resource `Topic:LITERAL:e2etopic`: User:client has Allow permission for operations: Describe from hosts: * User:client has Allow permission for operations: Create from hosts: * User:client has Allow permission for operations: Write from hosts: * User:client has Allow permission for operations: Read from hosts: * Current ACLs for resource `Group:LITERAL:group`: User:client has Allow permission for operations: Read from hosts: * Debug is true storeKey true useTicketCache false useKeyTab true doNotPrompt false ticketCache is null isInitiator true KeyTab is /tmp/kafka6561302204353356505.tmp refreshKrb5Config is false principal is cli...@example.com tryFirstPass is false useFirstPass is false storePass is false clearPass is false principal is cli...@example.com Will use keytab Commit Succeeded Debug is true storeKey true useTicketCache false useKeyTab true doNotPrompt false ticketCache is null isInitiator true KeyTab is /tmp/kafka6561302204353356505.tmp refreshKrb5Config is false principal is clie...@example.com tryFirstPass is false useFi
[jira] [Created] (KAFKA-9187) kafka.api.SaslGssapiSslEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
Bill Bejeck created KAFKA-9187: -- Summary: kafka.api.SaslGssapiSslEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl Key: KAFKA-9187 URL: https://issues.apache.org/jira/browse/KAFKA-9187 Project: Kafka Issue Type: Test Components: core Reporter: Bill Bejeck Failed in [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/26593/] {noformat} Error Messageorg.scalatest.exceptions.TestFailedException: Consumed 0 records before timeout instead of the expected 1 recordsStacktraceorg.scalatest.exceptions.TestFailedException: Consumed 0 records before timeout instead of the expected 1 records at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:530) at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) at org.scalatest.Assertions$class.fail(Assertions.scala:1091) at org.scalatest.Assertions$.fail(Assertions.scala:1389) at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:842) at kafka.utils.TestUtils$.pollRecordsUntilTrue(TestUtils.scala:793) at kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:1334) at kafka.utils.TestUtils$.consumeRecords(TestUtils.scala:1343) at kafka.api.EndToEndAuthorizationTest.consumeRecords(EndToEndAuthorizationTest.scala:530) at kafka.api.EndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl(EndToEndAuthorizationTest.scala:369) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at org.junit.runners.ParentRunner.run(ParentRunner.java:412) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) at
[jira] [Updated] (KAFKA-9178) restoredPartitions is not cleared until the last restoring task completes
[ https://issues.apache.org/jira/browse/KAFKA-9178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar updated KAFKA-9178: - Fix Version/s: (was: 2.4.0) > restoredPartitions is not cleared until the last restoring task completes > - > > Key: KAFKA-9178 > URL: https://issues.apache.org/jira/browse/KAFKA-9178 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.4.0 >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Blocker > Labels: streams > > We check the `active` set is empty during closeLostTasks(). However we don't > currently properly clear the {{restoredPartitions}} set in some edge cases: > We only remove partitions from {{restoredPartitions}} when a) all tasks are > done restoring, at which point we clear it entirely(in > {{AssignedStreamTasks#updateRestored}}), or b) one task at a time, when that > task is restoring and is closed. > Say some partitions were still restoring while others had completed and > transitioned to running when a rebalance occurs. The still-restoring tasks > are all revoked, and closed immediately, and their partitions removed from > {{restoredPartitions}}. We also suspend & revoke some running tasks that have > finished restoring, and remove them from {{running}}/{{runningByPartition}}. > Now we have only running tasks left, so in > {{TaskManager#updateNewAndRestoringTasks}} we don’t ever even call > {{AssignedStreamTasks#updateRestored }}and therefore we never get to clear > {{restoredPartitions}}. We then close each of the currently running tasks and > remove their partitions from everything, BUT we never got to remove or clear > the partitions of the running tasks that we revoked previously. > It turns out we can't just rely on removing from {{restoredPartitions }}upon > completion since the partitions will just be added back to it during the next > loop (blocked by KAFKA-9177). For now, we should just remove partitions from > {{restoredPartitions}} when closing or suspending running tasks as well. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9186) Kafka Connect floods logs with probably bogus error messages from DelegatingClassLoader
Piotr Szczepanik created KAFKA-9186: --- Summary: Kafka Connect floods logs with probably bogus error messages from DelegatingClassLoader Key: KAFKA-9186 URL: https://issues.apache.org/jira/browse/KAFKA-9186 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 2.3.1 Reporter: Piotr Szczepanik After upgrading Kafka Connect from 2.3.0 to 2.3.1 we discovered a lot of recurring ERROR messages in Connect's logs. {noformat} Plugin class loader for connector: 'com.google.pubsub.kafka.sink.CloudPubSubSinkConnector' was not found. Returning: org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@58f437b0 logger: org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader {noformat} Kafka Connect continues processing the topics as it should be. We are not using plugin classloader isolation feature by not specifying plugin.path property because we were seeing classloading deadlocks similar to ones described in [KAFKA-7421|https://issues.apache.org/jira/browse/KAFKA-7421] Maybe the level of this message should be lowered? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9046) Connect worker configs require undocumented 'admin.' prefix to configure DLQ for connectors
[ https://issues.apache.org/jira/browse/KAFKA-9046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar resolved KAFKA-9046. -- Fix Version/s: 2.4.0 2.3.2 Resolution: Fixed Issue resolved by pull request 7525 [https://github.com/apache/kafka/pull/7525] > Connect worker configs require undocumented 'admin.' prefix to configure DLQ > for connectors > --- > > Key: KAFKA-9046 > URL: https://issues.apache.org/jira/browse/KAFKA-9046 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.0, 2.4.0, 2.3.1 >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Blocker > Fix For: 2.3.2, 2.4.0 > > > The changes for KAFKA-8265 involved [adding a prefix of "admin." to Connect > worker > configs|https://github.com/apache/kafka/pull/6624/files#diff-316d2c222b623ee65e8065863bf4b9ceR606] > that would be used to configure the admin client that's used for connector > DLQs. However, this was never documented in the [corresponding > KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy] > and has broken backwards compatibility with prior Connect releases since > workers without the necessary {{"admin."}}-prefixed properties in their > configuration files will now fail in some circumstances (e.g., when > interacting with a secured Kafka cluster that requires authentication from > all admin clients that interact with it). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9046) Connect worker configs require undocumented 'admin.' prefix to configure DLQ for connectors
[ https://issues.apache.org/jira/browse/KAFKA-9046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16974034#comment-16974034 ] ASF GitHub Bot commented on KAFKA-9046: --- omkreddy commented on pull request #7525: KAFKA-9046: Use top-level worker configs for connector admin clients URL: https://github.com/apache/kafka/pull/7525 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Connect worker configs require undocumented 'admin.' prefix to configure DLQ > for connectors > --- > > Key: KAFKA-9046 > URL: https://issues.apache.org/jira/browse/KAFKA-9046 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.0, 2.4.0, 2.3.1 >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Blocker > > The changes for KAFKA-8265 involved [adding a prefix of "admin." to Connect > worker > configs|https://github.com/apache/kafka/pull/6624/files#diff-316d2c222b623ee65e8065863bf4b9ceR606] > that would be used to configure the admin client that's used for connector > DLQs. However, this was never documented in the [corresponding > KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy] > and has broken backwards compatibility with prior Connect releases since > workers without the necessary {{"admin."}}-prefixed properties in their > configuration files will now fail in some circumstances (e.g., when > interacting with a secured Kafka cluster that requires authentication from > all admin clients that interact with it). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-9159) Consumer.endOffsets Throw TimeoutException: Failed to get offsets by times in 30000ms after a leader change
[ https://issues.apache.org/jira/browse/KAFKA-9159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16969765#comment-16969765 ] zhangzhanchang edited comment on KAFKA-9159 at 11/14/19 8:21 AM: - this is modify: if (metadata.updateRequested() || (future.failed() && future.exception() instanceof InvalidMetadataException)) ? compatible with both cases,[~guozhang] was (Author: zzccctv): this modify: if (metadata.updateRequested() || (future.failed() && future.exception() instanceof InvalidMetadataException)), compatible with both cases > Consumer.endOffsets Throw TimeoutException: Failed to get offsets by times in > 3ms after a leader change > --- > > Key: KAFKA-9159 > URL: https://issues.apache.org/jira/browse/KAFKA-9159 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.2.0, 2.0.0 >Reporter: zhangzhanchang >Priority: Major > Attachments: image-2019-11-08-10-28-19-881.png, > image-2019-11-08-10-29-06-282.png > > > case 1: 0.10.2 version loop call Consumer.endOffsets Throw TimeoutException: > Failed to get offsets by times in 3ms after kill -9 broker,but a leader > change ,loop call Consumer.endOffsets no problem > !image-2019-11-08-10-28-19-881.png|width=416,height=299! > case 2: 2.0 version loop call Consumer.endOffsets Throw TimeoutException: > Failed to get offsets by times in 3ms after a leader change,but kill -9 > broker,loop call Consumer.endOffsets no problem > !image-2019-11-08-10-29-06-282.png|width=412,height=314! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-9044) Brokers occasionally (randomly?) dropping out of clusters
[ https://issues.apache.org/jira/browse/KAFKA-9044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16974007#comment-16974007 ] Peter Bukowinski edited comment on KAFKA-9044 at 11/14/19 8:15 AM: --- Update: I believe I've found the issue. I put all the broker drop-outs from the last month into a spreadsheet and saw that if the same broker dropped out more than once, the minute and second of the hour were the same. I was then able to correlate with the running of a separate hourly process. I'll spare you the details, but suffice it to say, this is most likely not kafka's fault. I have a feeling I'll be closing this issue after I've watched for any repeats of the drop-out behavior after implementing the fix. was (Author: pmbuko): Update: I believe I've found the issue. I put all the broker drop-outs from the last month into a spreadsheet and saw that if the same broker dropped out more than once, the minute and second of the hour were the same. I was then able to correlate with the running of a separate process. I'll spare you the details, but suffice it to say, this is most likely not kafka's fault. I have a feeling I'll be closing this issue after I've watched for any repeats of the drop-out behavior after implementing the fix. > Brokers occasionally (randomly?) dropping out of clusters > - > > Key: KAFKA-9044 > URL: https://issues.apache.org/jira/browse/KAFKA-9044 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.3.0, 2.3.1 > Environment: Ubuntu 14.04 >Reporter: Peter Bukowinski >Priority: Major > > I have several cluster running kafka 2.3.1 and 2.3.0 and this issue has > affected all of them. Because of replication and the size of the clusters (30 > brokers), this bug is not causing any data loss, but it is nevertheless > concerning. When a broker drops out, the log gives no indication that there > are any zookeeper issues (and indeed the zookeepers are healthy when this > occurs. Here's snippet from a broker log when it occurs: > {{[2019-10-07 11:02:27,630] INFO [GroupMetadataManager brokerId=14] Removed 0 > expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 11:02:56,936] INFO [Log partition=internal_test-52, > dir=/data/3/kl] Found deletable segments with base offsets [1975332] due to > retention time 360ms breach (kafka.log.Log)}} > {{[2019-10-07 11:02:56,936] INFO [Log partition=internal_test-52, > dir=/data/3/kl] Scheduling log segment [baseOffset 1975332, size 92076008] > for deletion. (kafka.log.Log)}} > {{[2019-10-07 11:02:56,936] INFO [Log partition=internal_test-52, > dir=/data/3/kl] Incrementing log start offset to 2000317 (kafka.log.Log)}} > {{[2019-10-07 11:03:56,936] INFO [Log partition=internal_test-52, > dir=/data/3/kl] Deleting segment 1975332 (kafka.log.Log)}} > {{[2019-10-07 11:03:56,957] INFO Deleted log > /data/3/kl/internal_test-52/01975332.log.deleted. > (kafka.log.LogSegment)}} > {{[2019-10-07 11:03:56,957] INFO Deleted offset index > /data/3/kl/internal_test-52/01975332.index.deleted. > (kafka.log.LogSegment)}} > {{[2019-10-07 11:03:56,958] INFO Deleted time index > /data/3/kl/internal_test-52/01975332.timeindex.deleted. > (kafka.log.LogSegment)}} > {{[2019-10-07 11:12:27,630] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 11:22:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 11:32:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 11:42:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 11:52:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 12:02:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 12:12:27,630] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 12:22:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 12:32:27,629] INFO [GroupMetadataManager brokerId=14] Rem
[jira] [Commented] (KAFKA-9044) Brokers occasionally (randomly?) dropping out of clusters
[ https://issues.apache.org/jira/browse/KAFKA-9044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16974007#comment-16974007 ] Peter Bukowinski commented on KAFKA-9044: - Update: I believe I've found the issue. I put all the broker drop-outs from the last month into a spreadsheet and saw that if the same broker dropped out more than once, the minute and second of the hour were the same. I was then able to correlate with the running of a separate process. I'll spare you the details, but suffice it to say, this is most likely not kafka's fault. I have a feeling I'll be closing this issue after I've watched for any repeats of the drop-out behavior after implementing the fix. > Brokers occasionally (randomly?) dropping out of clusters > - > > Key: KAFKA-9044 > URL: https://issues.apache.org/jira/browse/KAFKA-9044 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.3.0, 2.3.1 > Environment: Ubuntu 14.04 >Reporter: Peter Bukowinski >Priority: Major > > I have several cluster running kafka 2.3.1 and 2.3.0 and this issue has > affected all of them. Because of replication and the size of the clusters (30 > brokers), this bug is not causing any data loss, but it is nevertheless > concerning. When a broker drops out, the log gives no indication that there > are any zookeeper issues (and indeed the zookeepers are healthy when this > occurs. Here's snippet from a broker log when it occurs: > {{[2019-10-07 11:02:27,630] INFO [GroupMetadataManager brokerId=14] Removed 0 > expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 11:02:56,936] INFO [Log partition=internal_test-52, > dir=/data/3/kl] Found deletable segments with base offsets [1975332] due to > retention time 360ms breach (kafka.log.Log)}} > {{[2019-10-07 11:02:56,936] INFO [Log partition=internal_test-52, > dir=/data/3/kl] Scheduling log segment [baseOffset 1975332, size 92076008] > for deletion. (kafka.log.Log)}} > {{[2019-10-07 11:02:56,936] INFO [Log partition=internal_test-52, > dir=/data/3/kl] Incrementing log start offset to 2000317 (kafka.log.Log)}} > {{[2019-10-07 11:03:56,936] INFO [Log partition=internal_test-52, > dir=/data/3/kl] Deleting segment 1975332 (kafka.log.Log)}} > {{[2019-10-07 11:03:56,957] INFO Deleted log > /data/3/kl/internal_test-52/01975332.log.deleted. > (kafka.log.LogSegment)}} > {{[2019-10-07 11:03:56,957] INFO Deleted offset index > /data/3/kl/internal_test-52/01975332.index.deleted. > (kafka.log.LogSegment)}} > {{[2019-10-07 11:03:56,958] INFO Deleted time index > /data/3/kl/internal_test-52/01975332.timeindex.deleted. > (kafka.log.LogSegment)}} > {{[2019-10-07 11:12:27,630] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 11:22:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 11:32:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 11:42:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 11:52:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 12:02:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 12:12:27,630] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 12:22:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 12:32:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 12:42:27,630] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 1 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 12:52:27,629] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)}} > {{[2019-10-07 13:02:27,630] INFO [GroupMetadataManager brokerId=14] Removed > 0 expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadata