[jira] [Updated] (KAFKA-8733) Offline partitions occur when leader's disk is slow in reads while responding to follower fetch requests.
[ https://issues.apache.org/jira/browse/KAFKA-8733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-8733: -- Description: We found offline partitions issue multiple times on some of the hosts in our clusters. After going through the broker logs and hosts’s disk stats, it looks like this issue occurs whenever the read/write operations take more time on that disk. In a particular case where read time is more than the replica.lag.time.max.ms, follower replicas will be out of sync as their earlier fetch requests are stuck while reading the local log and their fetch status is not yet updated as mentioned in the below code of `ReplicaManager`. If there is an issue in reading the data from the log for a duration more than replica.lag.time.max.ms then all the replicas will be out of sync and partition becomes offline if min.isr.replicas > 1 and unclean.leader.election is false. {code:java} def readFromLog(): Seq[(TopicPartition, LogReadResult)] = { val result = readFromLocalLog( // this call took more than `replica.lag.time.max.ms` replicaId = replicaId, fetchOnlyFromLeader = fetchOnlyFromLeader, readOnlyCommitted = fetchOnlyCommitted, fetchMaxBytes = fetchMaxBytes, hardMaxBytesLimit = hardMaxBytesLimit, readPartitionInfo = fetchInfos, quota = quota, isolationLevel = isolationLevel) if (isFromFollower) updateFollowerLogReadResults(replicaId, result). // fetch time gets updated here, but mayBeShrinkIsr should have been already called and the replica is removed from sir else result } val logReadResults = readFromLog() {code} Attached the graphs of disk weighted io time stats when this issue occurred. I will raise a [KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-501+Avoid+offline+partitions+in+the+edgcase+scenario+of+follower+fetch+requests+not+processed+in+time] describing options on how to handle this scenario. was: We found offline partitions issue multiple times on some of the hosts in our clusters. After going through the broker logs and hosts’s disk stats, it looks like this issue occurs whenever the read/write operations take more time on that disk. In a particular case where read time is more than the replica.lag.time.max.ms, follower replicas will be out of sync as their earlier fetch requests are stuck while reading the local log and their fetch status is not yet updated as mentioned in the below code of `ReplicaManager`. If there is an issue in reading the data from the log for a duration more than replica.lag.time.max.ms then all the replicas will be out of sync and partition becomes offline if min.isr.replicas > 1 and unclean.leader.election is disabled. {code:java} def readFromLog(): Seq[(TopicPartition, LogReadResult)] = { val result = readFromLocalLog( // this call took more than `replica.lag.time.max.ms` replicaId = replicaId, fetchOnlyFromLeader = fetchOnlyFromLeader, readOnlyCommitted = fetchOnlyCommitted, fetchMaxBytes = fetchMaxBytes, hardMaxBytesLimit = hardMaxBytesLimit, readPartitionInfo = fetchInfos, quota = quota, isolationLevel = isolationLevel) if (isFromFollower) updateFollowerLogReadResults(replicaId, result). // fetch time gets updated here, but mayBeShrinkIsr should have been already called and the replica is removed from sir else result } val logReadResults = readFromLog() {code} Attached the graphs of disk weighted io time stats when this issue occurred. I will raise a [KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-501+Avoid+offline+partitions+in+the+edgcase+scenario+of+follower+fetch+requests+not+processed+in+time] describing options on how to handle this scenario. > Offline partitions occur when leader's disk is slow in reads while responding > to follower fetch requests. > - > > Key: KAFKA-8733 > URL: https://issues.apache.org/jira/browse/KAFKA-8733 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.1.2, 2.4.0 >Reporter: Satish Duggana >Assignee: Satish Duggana >Priority: Critical > Attachments: weighted-io-time-2.png, wio-time.png > > > We found offline partitions issue multiple times on some of the hosts in our > clusters. After going through the broker logs and hosts’s disk stats, it > looks like this issue occurs whenever the read/write operations take more > time on that disk. In a particular case where read time is more than the > replica.lag.time.max.ms, follower replicas will be out of sync as their > earlier fetch requests are stuck while reading the local log and their fetch > status is not yet updated as mentioned in the below code of `ReplicaManager`. > If there is an issue in reading the data from the log for a
[jira] [Commented] (KAFKA-8716) broker cannot join the cluster after upgrading kafka binary from 2.1.1 to 2.2.1 or 2.3.0
[ https://issues.apache.org/jira/browse/KAFKA-8716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896743#comment-16896743 ] Ismael Juma commented on KAFKA-8716: The documentation is out of date. I'll submit a PR to update it. > broker cannot join the cluster after upgrading kafka binary from 2.1.1 to > 2.2.1 or 2.3.0 > > > Key: KAFKA-8716 > URL: https://issues.apache.org/jira/browse/KAFKA-8716 > Project: Kafka > Issue Type: Bug > Components: zkclient >Affects Versions: 2.3.0, 2.2.1 >Reporter: Yu Yang >Priority: Critical > > We are trying to upgrade kafka binary from 2.1 to 2.2.1 or 2.3.0. For both > versions, the broker with updated binary (2.2.1 or 2.3.0) could not get > started due to zookeeper session expiration exception. This error happens > repeatedly and the broker could not start because of this. > Below is our zk related setting in server.properties: > {code} > zookeeper.connection.timeout.ms=6000 > zookeeper.session.timeout.ms=6000 > {code} > The following is the stack trace, and we are using zookeeper 3.5.3. Instead > of waiting for a few seconds, the SESSIONEXPIRED error returned immediately > in CheckedEphemeral.create call. Any insights? > [2019-07-25 18:07:35,712] INFO Creating /brokers/ids/80 (is it secure? false) > (kafka.zk.KafkaZkClient) > [2019-07-25 18:07:35,724] ERROR Error while creating ephemeral at > /brokers/ids/80 with return code: SESSIONEXPIRED > (kafka.zk.KafkaZkClient$CheckedEphemeral) > [2019-07-25 18:07:35,731] ERROR [KafkaServer id=80] Fatal error during > KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) > org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode > = Session expired > at org.apache.zookeeper.KeeperException.create(KeeperException.java:130) > at kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1725) > at kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1689) > at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:97) > at kafka.server.KafkaServer.startup(KafkaServer.scala:260) > at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38) > at kafka.Kafka$.main(Kafka.scala:75) > at kafka.Kafka.main(Kafka.scala) -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8716) broker cannot join the cluster after upgrading kafka binary from 2.1.1 to 2.2.1 or 2.3.0
[ https://issues.apache.org/jira/browse/KAFKA-8716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896733#comment-16896733 ] Xu Zhiyuan commented on KAFKA-8716: --- [~yuyang08] Although kafka integrate high version of zookeeper in the binary, but kafka document says that the current stable branch of zookeeper is 3.4 and the latest release of that branch is 3.4.9. Maybe 3.4.9 is a choice. [zookeeper|[https://kafka.apache.org/23/documentation.html#zk]] > broker cannot join the cluster after upgrading kafka binary from 2.1.1 to > 2.2.1 or 2.3.0 > > > Key: KAFKA-8716 > URL: https://issues.apache.org/jira/browse/KAFKA-8716 > Project: Kafka > Issue Type: Bug > Components: zkclient >Affects Versions: 2.3.0, 2.2.1 >Reporter: Yu Yang >Priority: Critical > > We are trying to upgrade kafka binary from 2.1 to 2.2.1 or 2.3.0. For both > versions, the broker with updated binary (2.2.1 or 2.3.0) could not get > started due to zookeeper session expiration exception. This error happens > repeatedly and the broker could not start because of this. > Below is our zk related setting in server.properties: > {code} > zookeeper.connection.timeout.ms=6000 > zookeeper.session.timeout.ms=6000 > {code} > The following is the stack trace, and we are using zookeeper 3.5.3. Instead > of waiting for a few seconds, the SESSIONEXPIRED error returned immediately > in CheckedEphemeral.create call. Any insights? > [2019-07-25 18:07:35,712] INFO Creating /brokers/ids/80 (is it secure? false) > (kafka.zk.KafkaZkClient) > [2019-07-25 18:07:35,724] ERROR Error while creating ephemeral at > /brokers/ids/80 with return code: SESSIONEXPIRED > (kafka.zk.KafkaZkClient$CheckedEphemeral) > [2019-07-25 18:07:35,731] ERROR [KafkaServer id=80] Fatal error during > KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) > org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode > = Session expired > at org.apache.zookeeper.KeeperException.create(KeeperException.java:130) > at kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1725) > at kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1689) > at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:97) > at kafka.server.KafkaServer.startup(KafkaServer.scala:260) > at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38) > at kafka.Kafka$.main(Kafka.scala:75) > at kafka.Kafka.main(Kafka.scala) -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (KAFKA-8738) Cleaning thread blocked when more than one ALTER_REPLICA_LOG_DIRS requests sent
[ https://issues.apache.org/jira/browse/KAFKA-8738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dingsainan updated KAFKA-8738: -- Description: Hi, I am experiencing one situation that the log cleaner dose not work for the related topic-partition when using --kafka-reassign-partitions.sh tool for V2.1.1 for more than one time frequently. My operation: submitting one task for migration replica in one same broker first, when the previous task still in progress, we submit one new task for the same topic-partition. {code:java} // the first task: {"partitions": [{"topic": "lancer_ops_billions_all_log_json_billions", "partition": 1, "replicas": [6,15], "log_dirs": ["any","/data/mnt/storage02/datum/kafka_data"]}] } //the second task {"partitions": [{"topic": "lancer_ops_billions_all_log_json_billions", "partition": 1, "replicas": [6,15], "log_dirs": ["any","/data/mnt/storage03/datum/kafka_data"]}] } {code} My search: Kafka executes abortAndPauseCleaning() once task is submitted, shortly, another task is submitted for the same topic-partition, so the clean thread status is {color:#ff}LogCleaningPaused(2){color} currently. When the second task completed, the clean thread will be resumed for this topic-partition once. In my case, the previous task is killed directly, no resumeClean() is executed for the first task, so when the second task is completed, the clean status for the topic-partition is still {color:#ff}LogCleaningPaused(1){color}, which blocks the clean thread for the topic-partition. _That's all my search, please confirm._ _Thanks_ _Nora_ was: Hi, I am experiencing one situation that the log cleaner dose not work for the related topic-partition when using --kafka-reassign-partitions.sh tool for V2.1.1 for more than one time frequently. My operation: submitting one task for migration replica in one same broker first, when the previous task still in progress, we submit one new task for the same topic-partition. My search: Kafka executes abortAndPauseCleaning() once task is submitted, shortly, another task is submitted for the same topic-partition, so the clean thread status is {color:#ff}LogCleaningPaused(2){color} currently. When the second task completed, the clean thread will be resumed for this topic-partition once. In my case, the previous task is killed directly, no resumeClean() is executed for the first task, so when the second task is completed, the clean status for the topic-partition is still {color:#ff}LogCleaningPaused(1){color}, which blocks the clean thread for the topic-partition. _That's all my search, please confirm._ _Thanks_ _Nora_ > Cleaning thread blocked when more than one ALTER_REPLICA_LOG_DIRS requests > sent > > > Key: KAFKA-8738 > URL: https://issues.apache.org/jira/browse/KAFKA-8738 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.1.1 >Reporter: dingsainan >Priority: Major > > Hi, > > I am experiencing one situation that the log cleaner dose not work for the > related topic-partition when using --kafka-reassign-partitions.sh tool for > V2.1.1 for more than one time frequently. > > My operation: > submitting one task for migration replica in one same broker first, when > the previous task still in progress, we submit one new task for the same > topic-partition. > > {code:java} > // the first task: > {"partitions": > [{"topic": "lancer_ops_billions_all_log_json_billions", > "partition": 1, > "replicas": [6,15], > "log_dirs": ["any","/data/mnt/storage02/datum/kafka_data"]}] > } > //the second task > {"partitions": > [{"topic": "lancer_ops_billions_all_log_json_billions", > "partition": 1, > "replicas": [6,15], > "log_dirs": ["any","/data/mnt/storage03/datum/kafka_data"]}] > } > > {code} > > My search: > Kafka executes abortAndPauseCleaning() once task is submitted, shortly, > another task is submitted for the same topic-partition, so the clean thread > status is {color:#ff}LogCleaningPaused(2){color} currently. When the > second task completed, the clean thread will be resumed for this > topic-partition once. In my case, the previous task is killed directly, no > resumeClean() is executed for the first task, so when the second task is > completed, the clean status for the topic-partition is still > {color:#ff}LogCleaningPaused(1){color}, which blocks the clean thread for > the topic-partition. > > _That's all my search, please confirm._ > > _Thanks_ > _Nora_ -- This message was sent by Atlassian JIRA
[jira] [Created] (KAFKA-8738) Cleaning thread blocked when more than one ALTER_REPLICA_LOG_DIRS requests sent
dingsainan created KAFKA-8738: - Summary: Cleaning thread blocked when more than one ALTER_REPLICA_LOG_DIRS requests sent Key: KAFKA-8738 URL: https://issues.apache.org/jira/browse/KAFKA-8738 Project: Kafka Issue Type: Bug Affects Versions: 2.1.1 Reporter: dingsainan Hi, I am experiencing one situation that the log cleaner dose not work for the related topic-partition when using --kafka-reassign-partitions.sh tool for V2.1.1 for more than one time frequently. My operation: submitting one task for migration replica in one same broker first, when the previous task still in progress, we submit one new task for the same topic-partition. My search: Kafka executes abortAndPauseCleaning() once task is submitted, shortly, another task is submitted for the same topic-partition, so the clean thread status is {color:#ff}LogCleaningPaused(2){color} currently. When the second task completed, the clean thread will be resumed for this topic-partition once. In my case, the previous task is killed directly, no resumeClean() is executed for the first task, so when the second task is completed, the clean status for the topic-partition is still {color:#ff}LogCleaningPaused(1){color}, which blocks the clean thread for the topic-partition. _That's all my search, please confirm._ _Thanks_ _Nora_ -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (KAFKA-8737) TaskMigrated Exception while rebalancing kafka streams
KUMAR created KAFKA-8737: Summary: TaskMigrated Exception while rebalancing kafka streams Key: KAFKA-8737 URL: https://issues.apache.org/jira/browse/KAFKA-8737 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 1.0.1, 1.0.0 Environment: 20 partitions 1 topic 8 Streamer service topic-region-1 9 7841726 8236017 394291 streams-subscriberstopic-region-1-29d615ed-4243-4b9d-90b7-9c517aa0f2e3-StreamThread-1-consumer-0276e83d-40b5-4b44-b764-7d29e0dab663/ streams-subscriberstopic-region-1-29d615ed-4243-4b9d-90b7-9c517aa0f2e3-StreamThread-1-consumer topic-region-1 15 7421710 7467666 45956 streams-subscriberstopic-region-1-29d615ed-4243-4b9d-90b7-9c517aa0f2e3-StreamThread-1-consumer-0276e83d-40b5-4b44-b764-7d29e0dab663/ streams-subscriberstopic-region-1-29d615ed-4243-4b9d-90b7-9c517aa0f2e3-StreamThread-1-consumer topic-region-1 19 7737360 8120611 383251 streams-subscriberstopic-region-1-29d615ed-4243-4b9d-90b7-9c517aa0f2e3-StreamThread-1-consumer-0276e83d-40b5-4b44-b764-7d29e0dab663/ streams-subscriberstopic-region-1-29d615ed-4243-4b9d-90b7-9c517aa0f2e3-StreamThread-1-consumer topic-region-1 Reporter: KUMAR Kafka streams throws following exception while restart of a stream client service - o.a.k.s.p.internals.StreamThread.? - stream-thread [streams-subscriberstopic-region-1-32d968e3-f892-4772-a7a4-6f684d7e43c9-StreamThread-1] Detected a task that got migrated to another thread. This implies that this thread missed a rebalance and dropped out of the consumer group. Trying to rejoin the consumer group now. org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of topic-region-1-12 should not change while restoring: old end offset 6286727, current offset 6380997 Kafka version is 1.0.0 and we have back merged the fix for KIP-6269- [https://github.com/apache/kafka/pull/4300/files#|https://github.com/apache/kafka/pull/4300/files] However we observe that there seems to be an issue in rebalance when "auto.offset.reset" is configured as "latest". Based on log analysis we see following behavior - # StreamThread starts a restore consumer # While Fetching it gets offset out of range o.a.k.c.consumer.internals.Fetcher.? - [Consumer clientId=streams-subscriberstopic-region-1-11b2d7fb-11ce-4b0b-a40a-388d3c7b6bc9-StreamThread-1-restore- consumer, groupId=] Fetch READ_UNCOMMITTED at offset 246431 for partition topic-region-1-12 returned fetch data (error=OFFSET_OUT_OF_RANGE, highWaterMark=-1, lastStableOffset = -1, logStartOffset = -1, abortedTransactions = null, recordsSizeInBytes=0) # Fetcher tries to reset the offset # While reset the offset it appears it is changing the offset position and causing TaskMigrated exception Above test repeated with "auto.offset.reset" is configured as "earliest" does not throw any TaskMigrated exception as in earliest case we are not reseting the restore consumer position. Please let us know if this is possible and if a fix would be needed for the offset reset piece when set to latest. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8698) ListOffsets Response protocol documentation
[ https://issues.apache.org/jira/browse/KAFKA-8698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896663#comment-16896663 ] ASF GitHub Bot commented on KAFKA-8698: --- asutosh936 commented on pull request #7141: KAFKA-8698 : Updated documentation to remove typo error. URL: https://github.com/apache/kafka/pull/7141 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ListOffsets Response protocol documentation > --- > > Key: KAFKA-8698 > URL: https://issues.apache.org/jira/browse/KAFKA-8698 > Project: Kafka > Issue Type: Bug > Components: documentation >Reporter: Fábio Silva >Assignee: Asutosh Pandya >Priority: Minor > Labels: documentation > > The documentation of ListOffsets Response (Version: 0) appears to have an > typo on offsets field name, suffixed with `'`. > {code:java} > [offsets']{code} -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Assigned] (KAFKA-8698) ListOffsets Response protocol documentation
[ https://issues.apache.org/jira/browse/KAFKA-8698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asutosh Pandya reassigned KAFKA-8698: - Assignee: Asutosh Pandya > ListOffsets Response protocol documentation > --- > > Key: KAFKA-8698 > URL: https://issues.apache.org/jira/browse/KAFKA-8698 > Project: Kafka > Issue Type: Bug > Components: documentation >Reporter: Fábio Silva >Assignee: Asutosh Pandya >Priority: Minor > Labels: documentation > > The documentation of ListOffsets Response (Version: 0) appears to have an > typo on offsets field name, suffixed with `'`. > {code:java} > [offsets']{code} -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8736) Performance: ThreadCache uses size() for empty cache check
[ https://issues.apache.org/jira/browse/KAFKA-8736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896629#comment-16896629 ] Sophie Blee-Goldman commented on KAFKA-8736: Thanks for the ticket! Would you be interested in opening a PR with this fix? > Performance: ThreadCache uses size() for empty cache check > -- > > Key: KAFKA-8736 > URL: https://issues.apache.org/jira/browse/KAFKA-8736 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.3.0 >Reporter: Matthew Jarvie >Priority: Major > Attachments: size.patch > > > While load testing Kafka Streams in 2.3.0, we stumbled across a potential > performance improvement. The test showed we were spending 80% of CPU time in > ConcurrentSkipListMap.size(): > > {noformat} > 100% org.apache.kafka.streams.processor.internals.StreamThread.run():774 > 100% org.apache.kafka.streams.processor.internals.StreamThread.runLoop():805 > 96.84% org.apache.kafka.streams.processor.internals.StreamThread.runOnce():890 > 96.84% > org.apache.kafka.streams.processor.internals.TaskManager.process(long):420 > 96.83% > org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(long):199 > 96.4% org.apache.kafka.streams.processor.internals.StreamTask.process():366 > 96.3% > org.apache.kafka.streams.processor.internals.SourceNode.process(java.lang.Object, > java.lang.Object):87 > 96.3% > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object, > java.lang.Object):133 > 96.3% > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object, > java.lang.Object, org.apache.kafka.streams.processor.To):180 > 96.3% > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(org.apache.kafka.streams.processor.internals.ProcessorNode, > java.lang.Object, java.lang.Object):201 > 96.23% > org.apache.kafka.streams.processor.internals.ProcessorNode.process(java.lang.Object, > java.lang.Object):117 > 96.12% > org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(java.lang.Object, > java.lang.Object):43 > 96.12% > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object, > java.lang.Object):133 > 96.12% > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object, > java.lang.Object, org.apache.kafka.streams.processor.To):180 > 96.12% > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(org.apache.kafka.streams.processor.internals.ProcessorNode, > java.lang.Object, java.lang.Object):201 > 96.08% > org.apache.kafka.streams.processor.internals.ProcessorNode.process(java.lang.Object, > java.lang.Object):117 > 82.78% > org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(java.lang.Object, > java.lang.Object):169 > 82.78% > org.apache.kafka.streams.processor.internals.ProcessorContextImpl$SessionStoreReadWriteDecorator.put(org.apache.kafka.streams.kstream.Windowed, > java.lang.Object):612 > 82.59% > org.apache.kafka.streams.state.internals.MeteredSessionStore.put(org.apache.kafka.streams.kstream.Windowed, > java.lang.Object):127 > 81.11% > org.apache.kafka.streams.state.internals.CachingSessionStore.put(org.apache.kafka.streams.kstream.Windowed, > java.lang.Object):35 > 81.09% > org.apache.kafka.streams.state.internals.CachingSessionStore.put(org.apache.kafka.streams.kstream.Windowed, > byte[]):131 > 81.09% > org.apache.kafka.streams.state.internals.ThreadCache.put(java.lang.String, > org.apache.kafka.common.utils.Bytes, > org.apache.kafka.streams.state.internals.LRUCacheEntry):151 > 80.53% > org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(java.lang.String):238 > 80.53% org.apache.kafka.streams.state.internals.NamedCache.size():266 > 80.53% java.util.concurrent.ConcurrentSkipListMap.size():1639{noformat} > According to > [https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentSkipListMap.html#size--], > the size method has to traverse all elements to get a count. It looks like > the count is being compared against 0 to determine if the map is empty; In > this case, we don't need a full count. Instead, the isEmpty() method should > be used, which just looks for one node. We patched this and gained about 25% > max throughput, and this method disappeared from thread dumps as a hot spot. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (KAFKA-8736) Performance: ThreadCache uses size() for empty cache check
[ https://issues.apache.org/jira/browse/KAFKA-8736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthew Jarvie updated KAFKA-8736: -- Attachment: size.patch > Performance: ThreadCache uses size() for empty cache check > -- > > Key: KAFKA-8736 > URL: https://issues.apache.org/jira/browse/KAFKA-8736 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.3.0 >Reporter: Matthew Jarvie >Priority: Major > Attachments: size.patch > > > While load testing Kafka Streams in 2.3.0, we stumbled across a potential > performance improvement. The test showed we were spending 80% of CPU time in > ConcurrentSkipListMap.size(): > > {noformat} > 100% org.apache.kafka.streams.processor.internals.StreamThread.run():774 > 100% org.apache.kafka.streams.processor.internals.StreamThread.runLoop():805 > 96.84% org.apache.kafka.streams.processor.internals.StreamThread.runOnce():890 > 96.84% > org.apache.kafka.streams.processor.internals.TaskManager.process(long):420 > 96.83% > org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(long):199 > 96.4% org.apache.kafka.streams.processor.internals.StreamTask.process():366 > 96.3% > org.apache.kafka.streams.processor.internals.SourceNode.process(java.lang.Object, > java.lang.Object):87 > 96.3% > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object, > java.lang.Object):133 > 96.3% > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object, > java.lang.Object, org.apache.kafka.streams.processor.To):180 > 96.3% > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(org.apache.kafka.streams.processor.internals.ProcessorNode, > java.lang.Object, java.lang.Object):201 > 96.23% > org.apache.kafka.streams.processor.internals.ProcessorNode.process(java.lang.Object, > java.lang.Object):117 > 96.12% > org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(java.lang.Object, > java.lang.Object):43 > 96.12% > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object, > java.lang.Object):133 > 96.12% > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object, > java.lang.Object, org.apache.kafka.streams.processor.To):180 > 96.12% > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(org.apache.kafka.streams.processor.internals.ProcessorNode, > java.lang.Object, java.lang.Object):201 > 96.08% > org.apache.kafka.streams.processor.internals.ProcessorNode.process(java.lang.Object, > java.lang.Object):117 > 82.78% > org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(java.lang.Object, > java.lang.Object):169 > 82.78% > org.apache.kafka.streams.processor.internals.ProcessorContextImpl$SessionStoreReadWriteDecorator.put(org.apache.kafka.streams.kstream.Windowed, > java.lang.Object):612 > 82.59% > org.apache.kafka.streams.state.internals.MeteredSessionStore.put(org.apache.kafka.streams.kstream.Windowed, > java.lang.Object):127 > 81.11% > org.apache.kafka.streams.state.internals.CachingSessionStore.put(org.apache.kafka.streams.kstream.Windowed, > java.lang.Object):35 > 81.09% > org.apache.kafka.streams.state.internals.CachingSessionStore.put(org.apache.kafka.streams.kstream.Windowed, > byte[]):131 > 81.09% > org.apache.kafka.streams.state.internals.ThreadCache.put(java.lang.String, > org.apache.kafka.common.utils.Bytes, > org.apache.kafka.streams.state.internals.LRUCacheEntry):151 > 80.53% > org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(java.lang.String):238 > 80.53% org.apache.kafka.streams.state.internals.NamedCache.size():266 > 80.53% java.util.concurrent.ConcurrentSkipListMap.size():1639{noformat} > According to > [https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentSkipListMap.html#size--], > the size method has to traverse all elements to get a count. It looks like > the count is being compared against 0 to determine if the map is empty; In > this case, we don't need a full count. Instead, the isEmpty() method should > be used, which just looks for one node. We patched this and gained about 25% > max throughput, and this method disappeared from thread dumps as a hot spot. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (KAFKA-8736) Performance: ThreadCache uses size() for empty cache check
Matthew Jarvie created KAFKA-8736: - Summary: Performance: ThreadCache uses size() for empty cache check Key: KAFKA-8736 URL: https://issues.apache.org/jira/browse/KAFKA-8736 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 2.3.0 Reporter: Matthew Jarvie Attachments: size.patch While load testing Kafka Streams in 2.3.0, we stumbled across a potential performance improvement. The test showed we were spending 80% of CPU time in ConcurrentSkipListMap.size(): {noformat} 100% org.apache.kafka.streams.processor.internals.StreamThread.run():774 100% org.apache.kafka.streams.processor.internals.StreamThread.runLoop():805 96.84% org.apache.kafka.streams.processor.internals.StreamThread.runOnce():890 96.84% org.apache.kafka.streams.processor.internals.TaskManager.process(long):420 96.83% org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(long):199 96.4% org.apache.kafka.streams.processor.internals.StreamTask.process():366 96.3% org.apache.kafka.streams.processor.internals.SourceNode.process(java.lang.Object, java.lang.Object):87 96.3% org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object, java.lang.Object):133 96.3% org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object, java.lang.Object, org.apache.kafka.streams.processor.To):180 96.3% org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(org.apache.kafka.streams.processor.internals.ProcessorNode, java.lang.Object, java.lang.Object):201 96.23% org.apache.kafka.streams.processor.internals.ProcessorNode.process(java.lang.Object, java.lang.Object):117 96.12% org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(java.lang.Object, java.lang.Object):43 96.12% org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object, java.lang.Object):133 96.12% org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object, java.lang.Object, org.apache.kafka.streams.processor.To):180 96.12% org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(org.apache.kafka.streams.processor.internals.ProcessorNode, java.lang.Object, java.lang.Object):201 96.08% org.apache.kafka.streams.processor.internals.ProcessorNode.process(java.lang.Object, java.lang.Object):117 82.78% org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(java.lang.Object, java.lang.Object):169 82.78% org.apache.kafka.streams.processor.internals.ProcessorContextImpl$SessionStoreReadWriteDecorator.put(org.apache.kafka.streams.kstream.Windowed, java.lang.Object):612 82.59% org.apache.kafka.streams.state.internals.MeteredSessionStore.put(org.apache.kafka.streams.kstream.Windowed, java.lang.Object):127 81.11% org.apache.kafka.streams.state.internals.CachingSessionStore.put(org.apache.kafka.streams.kstream.Windowed, java.lang.Object):35 81.09% org.apache.kafka.streams.state.internals.CachingSessionStore.put(org.apache.kafka.streams.kstream.Windowed, byte[]):131 81.09% org.apache.kafka.streams.state.internals.ThreadCache.put(java.lang.String, org.apache.kafka.common.utils.Bytes, org.apache.kafka.streams.state.internals.LRUCacheEntry):151 80.53% org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(java.lang.String):238 80.53% org.apache.kafka.streams.state.internals.NamedCache.size():266 80.53% java.util.concurrent.ConcurrentSkipListMap.size():1639{noformat} According to [https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentSkipListMap.html#size--], the size method has to traverse all elements to get a count. It looks like the count is being compared against 0 to determine if the map is empty; In this case, we don't need a full count. Instead, the isEmpty() method should be used, which just looks for one node. We patched this and gained about 25% max throughput, and this method disappeared from thread dumps as a hot spot. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8735) BrokerMetadataCheckPoint should check metadata.properties existence itself
[ https://issues.apache.org/jira/browse/KAFKA-8735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896526#comment-16896526 ] ASF GitHub Bot commented on KAFKA-8735: --- qinghui-xu commented on pull request #7139: KAFKA-8735: Check properties file existence first URL: https://github.com/apache/kafka/pull/7139 To make BrokerMetadataCheckpoint more robust, and avoid a leak of abstraction. Details and rationales are in the jira ticket. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > BrokerMetadataCheckPoint should check metadata.properties existence itself > --- > > Key: KAFKA-8735 > URL: https://issues.apache.org/jira/browse/KAFKA-8735 > Project: Kafka > Issue Type: Improvement >Reporter: Qinghui Xu >Priority: Major > > BrokerMetadataCheckPoint tries to read metadata.properties from log directory > during server start up. And it relies on org.apache.kafka.common.util.Utils > (from org.apache.kafka:kafka-clients) to load the properties file in a given > directory. > During the process, we need to handle the case in which the properties file > does not exist (not as an error). Currently, BrokerMetadataCheckPoint relies > on the behavior of `org.apache.kafka.common.util.Utils#loadProps` to find > out if the file exists or not: if the properties file is absent, it is > expecting NoSuchFileException (for branch 2.1 and above), and it was > expecting FileNotFoundException (for branch 2.0 and before). Knowing that > `org.apache.kafka.common.util.Utils#loadProps` signature throws only > IOException, this exception pattern matching is thus sort of leak of > abstraction making BrokerMetadataCheckPoint relies on the implementation > details of `org.apache.kafka.common.util.Utils#loadProps`. > This makes BrokerMetadataCheckPoint very fragile, especially when > `org.apache.kafka.common.util.Utils` and > `kafka.server.BrokerMetadataCheckPoint` are from different artifacts, an > example that I just ran into: > * We have a project A that depends on project B, and project B has a compile > time dependency on `org.apache.kafka:kafka-clients`. A is relying > `org.apach.kafka:kafka_2.11` in its tests: it will spawn some kafka brokers > in the tests. > * At first A and B are both using kafka libraries 2.0.1, and everything is > working fine > * At some point a newer version of B upgrades > `org.apache.kafka:kafka-clients` to 2.3.0 > * When A wants to use the newer version of B, its tests are broken because > kafka brokers fail to start: now `org.apache.kafka.common.util.Utils` (2.3.0) > throws NoSucheFileException while BrokerMetadataCheckPoint (2.0.1) expects to > catch FileNotFoundException > It would be much more reliable for BrokerMetadataCheckPoint to check the file > existence before trying to load the properties from the file. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (KAFKA-8735) BrokerMetadataCheckPoint should check metadata.properties existence itself
Qinghui Xu created KAFKA-8735: - Summary: BrokerMetadataCheckPoint should check metadata.properties existence itself Key: KAFKA-8735 URL: https://issues.apache.org/jira/browse/KAFKA-8735 Project: Kafka Issue Type: Improvement Reporter: Qinghui Xu BrokerMetadataCheckPoint tries to read metadata.properties from log directory during server start up. And it relies on org.apache.kafka.common.util.Utils (from org.apache.kafka:kafka-clients) to load the properties file in a given directory. During the process, we need to handle the case in which the properties file does not exist (not as an error). Currently, BrokerMetadataCheckPoint relies on the behavior of `org.apache.kafka.common.util.Utils#loadProps` to find out if the file exists or not: if the properties file is absent, it is expecting NoSuchFileException (for branch 2.1 and above), and it was expecting FileNotFoundException (for branch 2.0 and before). Knowing that `org.apache.kafka.common.util.Utils#loadProps` signature throws only IOException, this exception pattern matching is thus sort of leak of abstraction making BrokerMetadataCheckPoint relies on the implementation details of `org.apache.kafka.common.util.Utils#loadProps`. This makes BrokerMetadataCheckPoint very fragile, especially when `org.apache.kafka.common.util.Utils` and `kafka.server.BrokerMetadataCheckPoint` are from different artifacts, an example that I just ran into: * We have a project A that depends on project B, and project B has a compile time dependency on `org.apache.kafka:kafka-clients`. A is relying `org.apach.kafka:kafka_2.11` in its tests: it will spawn some kafka brokers in the tests. * At first A and B are both using kafka libraries 2.0.1, and everything is working fine * At some point a newer version of B upgrades `org.apache.kafka:kafka-clients` to 2.3.0 * When A wants to use the newer version of B, its tests are broken because kafka brokers fail to start: now `org.apache.kafka.common.util.Utils` (2.3.0) throws NoSucheFileException while BrokerMetadataCheckPoint (2.0.1) expects to catch FileNotFoundException It would be much more reliable for BrokerMetadataCheckPoint to check the file existence before trying to load the properties from the file. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8705) NullPointerException was thrown by topology optimization when two MergeNodes have common KeyChaingingNode
[ https://issues.apache.org/jira/browse/KAFKA-8705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896500#comment-16896500 ] Matthias J. Sax commented on KAFKA-8705: [~bbejeck] Is 2.3 the only affected version? > NullPointerException was thrown by topology optimization when two MergeNodes > have common KeyChaingingNode > - > > Key: KAFKA-8705 > URL: https://issues.apache.org/jira/browse/KAFKA-8705 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.0 >Reporter: Hiroshi Nakahara >Assignee: Bill Bejeck >Priority: Major > > NullPointerException was thrown by topology optimization when two MergeNodes > have common KeyChaingingNode. > Kafka Stream version: 2.3.0 > h3. Code > {code:java} > import org.apache.kafka.common.serialization.Serdes; > import org.apache.kafka.streams.StreamsBuilder; > import org.apache.kafka.streams.StreamsConfig; > import org.apache.kafka.streams.kstream.Consumed; > import org.apache.kafka.streams.kstream.KStream; > import java.util.Properties; > public class Main { > public static void main(String[] args) { > final StreamsBuilder streamsBuilder = new StreamsBuilder(); > final KStream parentStream = > streamsBuilder.stream("parentTopic", Consumed.with(Serdes.Integer(), > Serdes.Integer())) > .selectKey(Integer::sum); // To make parentStream > KeyChaingingPoint > final KStream childStream1 = > parentStream.mapValues(v -> v + 1); > final KStream childStream2 = > parentStream.mapValues(v -> v + 2); > final KStream childStream3 = > parentStream.mapValues(v -> v + 3); > childStream1 > .merge(childStream2) > .merge(childStream3) > .to("outputTopic"); > final Properties properties = new Properties(); > properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, > StreamsConfig.OPTIMIZE); > streamsBuilder.build(properties); > } > } > {code} > h3. Expected result > streamsBuilder.build should create Topology without throwing Exception. The > expected topology is: > {code:java} > Topologies: >Sub-topology: 0 > Source: KSTREAM-SOURCE-00 (topics: [parentTopic]) > --> KSTREAM-KEY-SELECT-01 > Processor: KSTREAM-KEY-SELECT-01 (stores: []) > --> KSTREAM-MAPVALUES-02, KSTREAM-MAPVALUES-03, > KSTREAM-MAPVALUES-04 > <-- KSTREAM-SOURCE-00 > Processor: KSTREAM-MAPVALUES-02 (stores: []) > --> KSTREAM-MERGE-05 > <-- KSTREAM-KEY-SELECT-01 > Processor: KSTREAM-MAPVALUES-03 (stores: []) > --> KSTREAM-MERGE-05 > <-- KSTREAM-KEY-SELECT-01 > Processor: KSTREAM-MAPVALUES-04 (stores: []) > --> KSTREAM-MERGE-06 > <-- KSTREAM-KEY-SELECT-01 > Processor: KSTREAM-MERGE-05 (stores: []) > --> KSTREAM-MERGE-06 > <-- KSTREAM-MAPVALUES-02, KSTREAM-MAPVALUES-03 > Processor: KSTREAM-MERGE-06 (stores: []) > --> KSTREAM-SINK-07 > <-- KSTREAM-MERGE-05, KSTREAM-MAPVALUES-04 > Sink: KSTREAM-SINK-07 (topic: outputTopic) > <-- KSTREAM-MERGE-06 > {code} > h3. Actual result > NullPointerException was thrown with the following stacktrace. > {code:java} > Exception in thread "main" java.lang.NullPointerException > at java.util.AbstractCollection.addAll(AbstractCollection.java:343) > at > org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeUpdateKeyChangingRepartitionNodeMap(InternalStreamsBuilder.java:397) > at > org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeOptimizeRepartitionOperations(InternalStreamsBuilder.java:315) > at > org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybePerformOptimizations(InternalStreamsBuilder.java:304) > at > org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:275) > at > org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:558) > at Main.main(Main.java:24){code} > h3. Cause > This exception occurs in > InternalStreamsBuilder#maybeUpdateKeyChaingingRepartitionNodeMap. > {code:java} > private void maybeUpdateKeyChangingRepartitionNodeMap() { > final Map> > mergeNodesToKeyChangers = new HashMap<>(); > for (final StreamsGraphNode mergeNode : mergeNodes) { > mergeNodesToKeyChangers.put(mergeNode, new LinkedHashSet<>()); > final Collection keys = >
[jira] [Created] (KAFKA-8734) Remove PartitionAssignorAdapter and deprecated PartitionAssignor interface
Sophie Blee-Goldman created KAFKA-8734: -- Summary: Remove PartitionAssignorAdapter and deprecated PartitionAssignor interface Key: KAFKA-8734 URL: https://issues.apache.org/jira/browse/KAFKA-8734 Project: Kafka Issue Type: Task Components: clients Affects Versions: 3.0.0 Reporter: Sophie Blee-Goldman In 2.4 we deprecated the consumer.internal.PartitionAssignor interface and migrated all assignors to the [new public consumer.ConsumerPartitionAssignor interface|[https://github.com/apache/kafka/pull/7108|https://github.com/apache/kafka/pull/7108/files]]. Although internal, we provided an [adapter |[https://github.com/apache/kafka/pull/7110]]for those who may have implemented a custom PartitionAssignor to avoid breaking changes. These should be removed in the next major release. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-7937) Flaky Test ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup
[ https://issues.apache.org/jira/browse/KAFKA-7937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896370#comment-16896370 ] Bruno Cadonna commented on KAFKA-7937: -- https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6601/testReport/ > Flaky Test ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup > > > Key: KAFKA-7937 > URL: https://issues.apache.org/jira/browse/KAFKA-7937 > Project: Kafka > Issue Type: Bug > Components: admin, clients, unit tests >Affects Versions: 2.2.0, 2.1.1, 2.3.0 >Reporter: Matthias J. Sax >Assignee: Gwen Shapira >Priority: Critical > Fix For: 2.4.0 > > Attachments: log-job6122.txt > > > To get stable nightly builds for `2.2` release, I create tickets for all > observed test failures. > https://builds.apache.org/blue/organizations/jenkins/kafka-2.2-jdk8/detail/kafka-2.2-jdk8/19/pipeline > {quote}kafka.admin.ResetConsumerGroupOffsetTest > > testResetOffsetsNotExistingGroup FAILED > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.CoordinatorNotAvailableException: The > coordinator is not available. at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at > kafka.admin.ConsumerGroupCommand$ConsumerGroupService.resetOffsets(ConsumerGroupCommand.scala:306) > at > kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup(ResetConsumerGroupOffsetTest.scala:89) > Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: > The coordinator is not available.{quote} -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8589) Flakey test ResetConsumerGroupOffsetTest#testResetOffsetsExistingTopic
[ https://issues.apache.org/jira/browse/KAFKA-8589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896369#comment-16896369 ] Bruno Cadonna commented on KAFKA-8589: -- https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6601/ > Flakey test ResetConsumerGroupOffsetTest#testResetOffsetsExistingTopic > -- > > Key: KAFKA-8589 > URL: https://issues.apache.org/jira/browse/KAFKA-8589 > Project: Kafka > Issue Type: Bug > Components: admin, clients, unit tests >Affects Versions: 2.4.0 >Reporter: Boyang Chen >Priority: Major > Labels: flaky-test > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/5724/consoleFull] > *20:25:15* > kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsExistingTopic > failed, log available in > /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/core/build/reports/testOutput/kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsExistingTopic.test.stdout*20:25:15* > *20:25:15* kafka.admin.ResetConsumerGroupOffsetTest > > testResetOffsetsExistingTopic FAILED*20:25:15* > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.CoordinatorNotAvailableException: The > coordinator is not available.*20:25:15* at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)*20:25:15* > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)*20:25:15* > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)*20:25:15* > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)*20:25:15* > at > kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$resetOffsets$1(ConsumerGroupCommand.scala:379)*20:25:15* > at > scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:160)*20:25:15* > at > scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:160)*20:25:15* > at scala.collection.Iterator.foreach(Iterator.scala:941)*20:25:15* > at scala.collection.Iterator.foreach$(Iterator.scala:941)*20:25:15* > at > scala.collection.AbstractIterator.foreach(Iterator.scala:1429)*20:25:15* >at scala.collection.IterableLike.foreach(IterableLike.scala:74)*20:25:15* >at > scala.collection.IterableLike.foreach$(IterableLike.scala:73)*20:25:15* > at scala.collection.AbstractIterable.foreach(Iterable.scala:56)*20:25:15* > at > scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:160)*20:25:15* > at > scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:158)*20:25:15* > at > scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)*20:25:15* > at > kafka.admin.ConsumerGroupCommand$ConsumerGroupService.resetOffsets(ConsumerGroupCommand.scala:377)*20:25:15* > at > kafka.admin.ResetConsumerGroupOffsetTest.resetOffsets(ResetConsumerGroupOffsetTest.scala:507)*20:25:15* > at > kafka.admin.ResetConsumerGroupOffsetTest.resetAndAssertOffsets(ResetConsumerGroupOffsetTest.scala:477)*20:25:15* > at > kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsExistingTopic(ResetConsumerGroupOffsetTest.scala:123)*20:25:15* > *20:25:15* Caused by:*20:25:15* > org.apache.kafka.common.errors.CoordinatorNotAvailableException: The > coordinator is not available.*20* -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8686) Flakey test ExampleConnectIntegrationTest#testSinkConnector
[ https://issues.apache.org/jira/browse/KAFKA-8686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896366#comment-16896366 ] Bruno Cadonna commented on KAFKA-8686: -- https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6601/testReport/ > Flakey test ExampleConnectIntegrationTest#testSinkConnector > --- > > Key: KAFKA-8686 > URL: https://issues.apache.org/jira/browse/KAFKA-8686 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect, unit tests >Affects Versions: 2.4.0 >Reporter: Boyang Chen >Priority: Major > Labels: flaky-test > > https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/429/console > org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSinkConnector > failed, log available in > /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/connect/runtime/build/reports/testOutput/org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSinkConnector.test.stdout*20:09:20* > *20:09:20* > org.apache.kafka.connect.integration.ExampleConnectIntegrationTest > > testSinkConnector FAILED*20:09:20* java.lang.AssertionError: Condition > not met within timeout 15000. Connector tasks were not assigned a partition > each.*20:09:20* at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:376)*20:09:20* > at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:353)*20:09:20* > at > org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSinkConnector(ExampleConnectIntegrationTest.java:128) -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (KAFKA-8733) Offline partitions occur when leader's disk is slow in reads while responding to follower fetch requests.
[ https://issues.apache.org/jira/browse/KAFKA-8733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-8733: -- Description: We found offline partitions issue multiple times on some of the hosts in our clusters. After going through the broker logs and hosts’s disk stats, it looks like this issue occurs whenever the read/write operations take more time on that disk. In a particular case where read time is more than the replica.lag.time.max.ms, follower replicas will be out of sync as their earlier fetch requests are stuck while reading the local log and their fetch status is not yet updated as mentioned in the below code of `ReplicaManager`. If there is an issue in reading the data from the log for a duration more than replica.lag.time.max.ms then all the replicas will be out of sync and partition becomes offline if min.isr.replicas > 1 and unclean.leader.election is disabled. {code:java} def readFromLog(): Seq[(TopicPartition, LogReadResult)] = { val result = readFromLocalLog( // this call took more than `replica.lag.time.max.ms` replicaId = replicaId, fetchOnlyFromLeader = fetchOnlyFromLeader, readOnlyCommitted = fetchOnlyCommitted, fetchMaxBytes = fetchMaxBytes, hardMaxBytesLimit = hardMaxBytesLimit, readPartitionInfo = fetchInfos, quota = quota, isolationLevel = isolationLevel) if (isFromFollower) updateFollowerLogReadResults(replicaId, result). // fetch time gets updated here, but mayBeShrinkIsr should have been already called and the replica is removed from sir else result } val logReadResults = readFromLog() {code} Attached the graphs of disk weighted io time stats when this issue occurred. I will raise a [KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-501+Avoid+offline+partitions+in+the+edgcase+scenario+of+follower+fetch+requests+not+processed+in+time] describing options on how to handle this scenario. was: We found offline partitions issue multiple times on some of the hosts in our clusters. After going through the broker logs and hosts’s disk stats, it looks like this issue occurs whenever the read/write operations take more time on that disk. In a particular case where read time is more than the replica.lag.time.max.ms, follower replicas will be out of sync as their earlier fetch requests are stuck while reading the local log and their fetch status is not yet updated as mentioned in the below code of `ReplicaManager`. If there is an issue in reading the data from the log for a duration more than replica.lag.time.max.ms then all the replicas will be out of sync and partition becomes offline if min.isr.replicas > 1 and unclean.leader.election is disabled. {code:java} def readFromLog(): Seq[(TopicPartition, LogReadResult)] = { val result = readFromLocalLog( // this call took more than `replica.lag.time.max.ms` replicaId = replicaId, fetchOnlyFromLeader = fetchOnlyFromLeader, readOnlyCommitted = fetchOnlyCommitted, fetchMaxBytes = fetchMaxBytes, hardMaxBytesLimit = hardMaxBytesLimit, readPartitionInfo = fetchInfos, quota = quota, isolationLevel = isolationLevel) if (isFromFollower) updateFollowerLogReadResults(replicaId, result). // fetch time gets updated here, but mayBeShrinkIsr should have been already called and the replica is removed from sir else result } val logReadResults = readFromLog() {code} I will raise a [KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-501+Avoid+offline+partitions+in+the+edgcase+scenario+of+follower+fetch+requests+not+processed+in+time] describing options on how to handle this scenario. > Offline partitions occur when leader's disk is slow in reads while responding > to follower fetch requests. > - > > Key: KAFKA-8733 > URL: https://issues.apache.org/jira/browse/KAFKA-8733 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.1.2, 2.4.0 >Reporter: Satish Duggana >Assignee: Satish Duggana >Priority: Critical > Attachments: weighted-io-time-2.png, wio-time.png > > > We found offline partitions issue multiple times on some of the hosts in our > clusters. After going through the broker logs and hosts’s disk stats, it > looks like this issue occurs whenever the read/write operations take more > time on that disk. In a particular case where read time is more than the > replica.lag.time.max.ms, follower replicas will be out of sync as their > earlier fetch requests are stuck while reading the local log and their fetch > status is not yet updated as mentioned in the below code of `ReplicaManager`. > If there is an issue in reading the data from the log for a duration more > than replica.lag.time.max.ms then all the replicas will be out
[jira] [Updated] (KAFKA-8733) Offline partitions occur when leader's disk is slow in reads while responding to follower fetch requests.
[ https://issues.apache.org/jira/browse/KAFKA-8733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-8733: -- Attachment: wio-time.png weighted-io-time-2.png > Offline partitions occur when leader's disk is slow in reads while responding > to follower fetch requests. > - > > Key: KAFKA-8733 > URL: https://issues.apache.org/jira/browse/KAFKA-8733 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.1.2, 2.4.0 >Reporter: Satish Duggana >Assignee: Satish Duggana >Priority: Critical > Attachments: weighted-io-time-2.png, wio-time.png > > > We found offline partitions issue multiple times on some of the hosts in our > clusters. After going through the broker logs and hosts’s disk stats, it > looks like this issue occurs whenever the read/write operations take more > time on that disk. In a particular case where read time is more than the > replica.lag.time.max.ms, follower replicas will be out of sync as their > earlier fetch requests are stuck while reading the local log and their fetch > status is not yet updated as mentioned in the below code of `ReplicaManager`. > If there is an issue in reading the data from the log for a duration more > than replica.lag.time.max.ms then all the replicas will be out of sync and > partition becomes offline if min.isr.replicas > 1 and unclean.leader.election > is disabled. > > {code:java} > def readFromLog(): Seq[(TopicPartition, LogReadResult)] = { > val result = readFromLocalLog( // this call took more than > `replica.lag.time.max.ms` > replicaId = replicaId, > fetchOnlyFromLeader = fetchOnlyFromLeader, > readOnlyCommitted = fetchOnlyCommitted, > fetchMaxBytes = fetchMaxBytes, > hardMaxBytesLimit = hardMaxBytesLimit, > readPartitionInfo = fetchInfos, > quota = quota, > isolationLevel = isolationLevel) > if (isFromFollower) updateFollowerLogReadResults(replicaId, result). // > fetch time gets updated here, but mayBeShrinkIsr should have been already > called and the replica is removed from sir > else result > } > val logReadResults = readFromLog() > {code} > I will raise a > [KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-501+Avoid+offline+partitions+in+the+edgcase+scenario+of+follower+fetch+requests+not+processed+in+time] > describing options on how to handle this scenario. > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (KAFKA-8733) Offline partitions occur when leader's disk is slow in reads while responding to follower fetch requests.
[ https://issues.apache.org/jira/browse/KAFKA-8733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-8733: -- Description: We found offline partitions issue multiple times on some of the hosts in our clusters. After going through the broker logs and hosts’s disk stats, it looks like this issue occurs whenever the read/write operations take more time on that disk. In a particular case where read time is more than the replica.lag.time.max.ms, follower replicas will be out of sync as their earlier fetch requests are stuck while reading the local log and their fetch status is not yet updated as mentioned in the below code of `ReplicaManager`. If there is an issue in reading the data from the log for a duration more than replica.lag.time.max.ms then all the replicas will be out of sync and partition becomes offline if min.isr.replicas > 1 and unclean.leader.election is disabled. {code:java} def readFromLog(): Seq[(TopicPartition, LogReadResult)] = { val result = readFromLocalLog( // this call took more than `replica.lag.time.max.ms` replicaId = replicaId, fetchOnlyFromLeader = fetchOnlyFromLeader, readOnlyCommitted = fetchOnlyCommitted, fetchMaxBytes = fetchMaxBytes, hardMaxBytesLimit = hardMaxBytesLimit, readPartitionInfo = fetchInfos, quota = quota, isolationLevel = isolationLevel) if (isFromFollower) updateFollowerLogReadResults(replicaId, result). // fetch time gets updated here, but mayBeShrinkIsr should have been already called and the replica is removed from sir else result } val logReadResults = readFromLog() {code} I will raise a [KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-501+Avoid+offline+partitions+in+the+edgcase+scenario+of+follower+fetch+requests+not+processed+in+time] describing options on how to handle this scenario. was: We found offline partitions issue multiple times on some of the hosts in our clusters. After going through the broker logs and hosts’s disk stats, it looks like this issue occurs whenever the read/write operations take more time on that disk. In a particular case where read time is more than the replica.lag.time.max.ms, follower replicas will be out of sync as their earlier fetch requests are stuck while reading the local log and their fetch status is not yet updated as mentioned in the below code of `ReplicaManager`. If there is an issue in reading the data from the log for a duration more than replica.lag.time.max.ms then all the replicas will be out of sync and partition becomes offline if min.isr.replicas > 1 and unclean.leader.election is disabled. {code:java} def readFromLog(): Seq[(TopicPartition, LogReadResult)] = { val result = readFromLocalLog( // this call took more than `replica.lag.time.max.ms` replicaId = replicaId, fetchOnlyFromLeader = fetchOnlyFromLeader, readOnlyCommitted = fetchOnlyCommitted, fetchMaxBytes = fetchMaxBytes, hardMaxBytesLimit = hardMaxBytesLimit, readPartitionInfo = fetchInfos, quota = quota, isolationLevel = isolationLevel) if (isFromFollower) updateFollowerLogReadResults(replicaId, result). // fetch time gets updated here, but mayBeShrinkIsr should have been already called and the replica is removed from sir else result } val logReadResults = readFromLog() {code} I will raise a KIP describing options on how to handle this scenario. > Offline partitions occur when leader's disk is slow in reads while responding > to follower fetch requests. > - > > Key: KAFKA-8733 > URL: https://issues.apache.org/jira/browse/KAFKA-8733 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.1.2, 2.4.0 >Reporter: Satish Duggana >Assignee: Satish Duggana >Priority: Critical > > We found offline partitions issue multiple times on some of the hosts in our > clusters. After going through the broker logs and hosts’s disk stats, it > looks like this issue occurs whenever the read/write operations take more > time on that disk. In a particular case where read time is more than the > replica.lag.time.max.ms, follower replicas will be out of sync as their > earlier fetch requests are stuck while reading the local log and their fetch > status is not yet updated as mentioned in the below code of `ReplicaManager`. > If there is an issue in reading the data from the log for a duration more > than replica.lag.time.max.ms then all the replicas will be out of sync and > partition becomes offline if min.isr.replicas > 1 and unclean.leader.election > is disabled. > > {code:java} > def readFromLog(): Seq[(TopicPartition, LogReadResult)] = { > val result = readFromLocalLog( // this call took more than > `replica.lag.time.max.ms` > replicaId =
[jira] [Created] (KAFKA-8733) Offline partitions occur when leader's disk is slow in reads while responding to follower fetch requests.
Satish Duggana created KAFKA-8733: - Summary: Offline partitions occur when leader's disk is slow in reads while responding to follower fetch requests. Key: KAFKA-8733 URL: https://issues.apache.org/jira/browse/KAFKA-8733 Project: Kafka Issue Type: Bug Components: core Affects Versions: 1.1.2, 2.4.0 Reporter: Satish Duggana Assignee: Satish Duggana We found offline partitions issue multiple times on some of the hosts in our clusters. After going through the broker logs and hosts’s disk stats, it looks like this issue occurs whenever the read/write operations take more time on that disk. In a particular case where read time is more than the replica.lag.time.max.ms, follower replicas will be out of sync as their earlier fetch requests are stuck while reading the local log and their fetch status is not yet updated as mentioned in the below code of `ReplicaManager`. If there is an issue in reading the data from the log for a duration more than replica.lag.time.max.ms then all the replicas will be out of sync and partition becomes offline if min.isr.replicas > 1 and unclean.leader.election is disabled. {code:java} def readFromLog(): Seq[(TopicPartition, LogReadResult)] = { val result = readFromLocalLog( // this call took more than `replica.lag.time.max.ms` replicaId = replicaId, fetchOnlyFromLeader = fetchOnlyFromLeader, readOnlyCommitted = fetchOnlyCommitted, fetchMaxBytes = fetchMaxBytes, hardMaxBytesLimit = hardMaxBytesLimit, readPartitionInfo = fetchInfos, quota = quota, isolationLevel = isolationLevel) if (isFromFollower) updateFollowerLogReadResults(replicaId, result). // fetch time gets updated here, but mayBeShrinkIsr should have been already called and the replica is removed from sir else result } val logReadResults = readFromLog() {code} I will raise a KIP describing options on how to handle this scenario. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Resolved] (KAFKA-8717) Use cached hw/lso offset metadata when reading from log
[ https://issues.apache.org/jira/browse/KAFKA-8717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-8717. Resolution: Fixed Fix Version/s: 2.4.0 > Use cached hw/lso offset metadata when reading from log > --- > > Key: KAFKA-8717 > URL: https://issues.apache.org/jira/browse/KAFKA-8717 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 2.4.0 > > > The broker caches log offset metadata (e.g. segment position) for the high > watermark and last stable offset in order to avoid additional index lookups > when handling fetches. Currently this metadata is only used when determining > delayed fetch satisfaction. We can also use it when reading from the log in > order to avoid additional redundant index lookups. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8717) Use cached hw/lso offset metadata when reading from log
[ https://issues.apache.org/jira/browse/KAFKA-8717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896251#comment-16896251 ] ASF GitHub Bot commented on KAFKA-8717: --- hachikuji commented on pull request #7081: KAFKA-8717: Reuse cached offset metadata when reading from log URL: https://github.com/apache/kafka/pull/7081 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Use cached hw/lso offset metadata when reading from log > --- > > Key: KAFKA-8717 > URL: https://issues.apache.org/jira/browse/KAFKA-8717 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > The broker caches log offset metadata (e.g. segment position) for the high > watermark and last stable offset in order to avoid additional index lookups > when handling fetches. Currently this metadata is only used when determining > delayed fetch satisfaction. We can also use it when reading from the log in > order to avoid additional redundant index lookups. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Resolved] (KAFKA-8442) Inconsistent ISR output in topic command when using --bootstrap-server
[ https://issues.apache.org/jira/browse/KAFKA-8442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-8442. Resolution: Fixed Fix Version/s: 2.4.0 > Inconsistent ISR output in topic command when using --bootstrap-server > -- > > Key: KAFKA-8442 > URL: https://issues.apache.org/jira/browse/KAFKA-8442 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: huxihx >Priority: Major > Fix For: 2.4.0 > > > If there is no leader for a partition, the Metadata API returns an empty ISR. > When using the `--bootstrap-server` option with `kafka-topics.sh`, this leads > to the following output: > {code} > Topic:foo PartitionCount:1ReplicationFactor:2 > Configs:segment.bytes=1073741824 > Topic: foo Partition: 0Leader: noneReplicas: 1,3 Isr: > {code} > When using `--zookeeper`, we display the current ISR correctly: > {code} > Topic:foo PartitionCount:1ReplicationFactor:2 Configs: > Topic: foo Partition: 0Leader: -1 Replicas: 1,3 Isr: 1 > {code} > To avoid confusion, we should make this output consistent or at least not > misleading. We should either change the Metadata API to print the ISR when we > have it or we can change the output of the topic command to `N/A` or > something like that. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8442) Inconsistent ISR output in topic command when using --bootstrap-server
[ https://issues.apache.org/jira/browse/KAFKA-8442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896248#comment-16896248 ] ASF GitHub Bot commented on KAFKA-8442: --- hachikuji commented on pull request #6836: KAFKA-8442:Inconsistent ISR output in topic command URL: https://github.com/apache/kafka/pull/6836 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Inconsistent ISR output in topic command when using --bootstrap-server > -- > > Key: KAFKA-8442 > URL: https://issues.apache.org/jira/browse/KAFKA-8442 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: huxihx >Priority: Major > > If there is no leader for a partition, the Metadata API returns an empty ISR. > When using the `--bootstrap-server` option with `kafka-topics.sh`, this leads > to the following output: > {code} > Topic:foo PartitionCount:1ReplicationFactor:2 > Configs:segment.bytes=1073741824 > Topic: foo Partition: 0Leader: noneReplicas: 1,3 Isr: > {code} > When using `--zookeeper`, we display the current ISR correctly: > {code} > Topic:foo PartitionCount:1ReplicationFactor:2 Configs: > Topic: foo Partition: 0Leader: -1 Replicas: 1,3 Isr: 1 > {code} > To avoid confusion, we should make this output consistent or at least not > misleading. We should either change the Metadata API to print the ISR when we > have it or we can change the output of the topic command to `N/A` or > something like that. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Resolved] (KAFKA-8640) Replace OffsetFetch request/response with automated protocol
[ https://issues.apache.org/jira/browse/KAFKA-8640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-8640. Resolution: Fixed > Replace OffsetFetch request/response with automated protocol > > > Key: KAFKA-8640 > URL: https://issues.apache.org/jira/browse/KAFKA-8640 > Project: Kafka > Issue Type: Sub-task >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8640) Replace OffsetFetch request/response with automated protocol
[ https://issues.apache.org/jira/browse/KAFKA-8640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896230#comment-16896230 ] ASF GitHub Bot commented on KAFKA-8640: --- hachikuji commented on pull request #7062: KAFKA-8640: Replace OffsetFetch request with automated protocol URL: https://github.com/apache/kafka/pull/7062 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Replace OffsetFetch request/response with automated protocol > > > Key: KAFKA-8640 > URL: https://issues.apache.org/jira/browse/KAFKA-8640 > Project: Kafka > Issue Type: Sub-task >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8711) Kafka 2.3.0 Transient Unit Test Failures SocketServerTest. testControlPlaneRequest
[ https://issues.apache.org/jira/browse/KAFKA-8711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896180#comment-16896180 ] Chandrasekhar commented on KAFKA-8711: -- Any feedback? > Kafka 2.3.0 Transient Unit Test Failures SocketServerTest. > testControlPlaneRequest > -- > > Key: KAFKA-8711 > URL: https://issues.apache.org/jira/browse/KAFKA-8711 > Project: Kafka > Issue Type: Test > Components: core, unit tests >Affects Versions: 2.3.0 >Reporter: Chandrasekhar >Priority: Critical > Attachments: KafkaAUTFailures07242019_PASS2.txt, > KafkaUTFailures07242019_PASS2.GIF > > > Cloned Kakfa 2.3.0 source to our git repo and compiled it using 'gradle > build', we see the following error consistently: > Gradle Version 4.7 > > testControlPlaneRequest > java.net.BindException: Address already in use (Bind failed) > at java.net.PlainSocketImpl.socketBind(Native Method) > at > java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:387) > at java.net.Socket.bind(Socket.java:644) > at java.net.Socket.(Socket.java:433) > at java.net.Socket.(Socket.java:286) > at kafka.network.SocketServerTest.connect(SocketServerTest.scala:140) > at > kafka.network.SocketServerTest.$anonfun$testControlPlaneRequest$1(SocketServerTest.scala:200) > at > kafka.network.SocketServerTest.$anonfun$testControlPlaneRequest$1$adapted(SocketServerTest.scala:199) > at > kafka.network.SocketServerTest.withTestableServer(SocketServerTest.scala:1141) > at > kafka.network.SocketServerTest.testControlPlaneRequest(SocketServerTest.scala:199) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at org.junit.runners.ParentRunner.run(ParentRunner.java:412) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at sun.reflect.GeneratedMethodAccessor31.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32) > at >
[jira] [Created] (KAFKA-8732) specifying a non-existent broker to ./bin/kafka-reassign-partitions.sh leads to reassignment never getting completed.
Raunak created KAFKA-8732: - Summary: specifying a non-existent broker to ./bin/kafka-reassign-partitions.sh leads to reassignment never getting completed. Key: KAFKA-8732 URL: https://issues.apache.org/jira/browse/KAFKA-8732 Project: Kafka Issue Type: Bug Components: controller, tools Affects Versions: 0.10.1.1 Environment: Ubuntu-VERSION="14.04.5 LTS" Reporter: Raunak Specifying a non-existent broker to ./bin/kafka-reassign-partitions.sh leads to reassignment never getting completed. My reassignment is getting struck if I provide non-existing broker ID. My kafka version is 0.10.1.1. {code:java} ./kafka-reassign-partitions.sh --zookeeper zk:2181 --reassignment-json-file le.json --execute Current partition replica assignment {"version":1,"partitions":[{"topic":"cv-topic","partition":0,"replicas":[1011131,101067,98,101240]}]} Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions. {code} In this 98 is the non-existing broker. Deleting reassign_partitions znode is of no use as well. As when I describe the same topic the 98 broker is out of sync. {code:java} Topic:cv-topic PartitionCount:1 ReplicationFactor:4 Configs: Topic: cv-topic Partition: 0 Leader: 1011131 Replicas: 1011131,101067,98,101240 Isr: 1011131,101067,101240 {code} Now 98 will always be out of sync. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-821) specifying a non-existent partition to ./bin/kafka-reassign-partitions.sh breaks all reassignment ops
[ https://issues.apache.org/jira/browse/KAFKA-821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896003#comment-16896003 ] Raunak commented on KAFKA-821: -- [~nehanarkhede] I don't think this is working. My reassignment is getting struck if I provide non-existing broker ID. My kafka version is 0.10.1.1. {code:java} ./kafka-reassign-partitions.sh --zookeeper zk:2181 --reassignment-json-file le.json --execute Current partition replica assignment {"version":1,"partitions":[{"topic":"cv-topic","partition":0,"replicas":[1011131,101067,98,101240]}]} Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions. {code} In this 98 is the non-existing broker. Deleting reassign_partitions znode is of no use as well. As when I describe the same topic the 98 broker is out of sync. {code:java} Topic:cv-topic PartitionCount:1 ReplicationFactor:4 Configs: Topic: cv-topic Partition: 0 Leader: 1011131 Replicas: 1011131,101067,98,101240 Isr: 1011131,101067,101240 {code} Now 98 will always be out of sync. > specifying a non-existent partition to ./bin/kafka-reassign-partitions.sh > breaks all reassignment ops > - > > Key: KAFKA-821 > URL: https://issues.apache.org/jira/browse/KAFKA-821 > Project: Kafka > Issue Type: Sub-task > Components: controller, tools >Affects Versions: 0.8.0 >Reporter: Scott Clasen >Assignee: Swapnil Ghike >Priority: Blocker > Labels: kafka-0.8, p1 > > From my ML post...the workaround is to manually delete > /admin/reassign_partitions from ZK, that node contained a json with an empty > partitions array. > Have 3 brokers running. Ids 25,26,27 > ./bin/kafka-create-topic.sh --replica 3 --topic first-cluster-topic > --zookeeper :2181/kafka > Seems fine, can send/receive, etc.. > Kill 27, start 28. Try to reassign the single partition topic with the > following json. > Contains an error. partition should be 0 not 1. > {"partitions": > [{"topic": "first-cluster-topic", "partition": 1, "replicas": [25,26,28] }] > } > ./bin/kafka-reassign-partitions.sh --zookeeper ... -path-to-json-file > reassign.json > 2013-03-21 12:14:46,170] INFO zookeeper state changed (SyncConnected) > (org.I0Itec.zkclient.ZkClient) > [2013-03-21 12:14:46,310] ERROR Skipping reassignment of partition > [first-cluster-topic,1] since it doesn't exist > (kafka.admin.ReassignPartitionsCommand) > Successfully started reassignment of partitions Map([first-cluster-topic,1] > -> List(25, 26, 28)) > [2013-03-21 12:14:46,665] INFO Terminate ZkClient event thread. > (org.I0Itec.zkclient.ZkEventThread) > [2013-03-21 12:14:46,780] INFO Session: 0x13d8a63a3760007 closed > (org.apache.zookeeper.ZooKeeper) > [2013-03-21 12:14:46,780] INFO EventThread shut down > (org.apache.zookeeper.ClientCnxn) > Ok, fix the JSON > {"partitions": > [{"topic": "first-cluster-topic", "partition": 0, "replicas": [25,26,28] }] > } > ./bin/kafka-reassign-partitions.sh --zookeeper ... -path-to-json-file > reassign.json > [2013-03-21 12:17:34,367] INFO zookeeper state changed (SyncConnected) > (org.I0Itec.zkclient.ZkClient) > Partitions reassignment failed due to Partition reassignment currently in > progress for Map(). Aborting operation > kafka.common.AdminCommandFailedException: Partition reassignment currently > in progress for Map(). Aborting operation > at > kafka.admin.ReassignPartitionsCommand.reassignPartitions(ReassignPartitionsCommand.scala:91) > at > kafka.admin.ReassignPartitionsCommand$.main(ReassignPartitionsCommand.scala:65) > at > kafka.admin.ReassignPartitionsCommand.main(ReassignPartitionsCommand.scala) > ./bin/kafka-check-reassignment-status.sh --zookeeper ... > --path-to-json-file reassign.json > [2013-03-21 12:20:40,607] INFO zookeeper state changed (SyncConnected) > (org.I0Itec.zkclient.ZkClient) > Exception in thread "main" java.lang.ClassCastException: > scala.collection.immutable.Map$Map1 cannot be cast to > [Lscala.collection.Map; > at > kafka.admin.CheckReassignmentStatus$.main(CheckReassignmentStatus.scala:44) > at kafka.admin.CheckReassignmentStatus.main(CheckReassignmentStatus.scala) -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-2758) Improve Offset Commit Behavior
[ https://issues.apache.org/jira/browse/KAFKA-2758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16895953#comment-16895953 ] Omkar Mestry commented on KAFKA-2758: - Is this issue still open and if open can I assign it to myself? > Improve Offset Commit Behavior > -- > > Key: KAFKA-2758 > URL: https://issues.apache.org/jira/browse/KAFKA-2758 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Guozhang Wang >Priority: Major > Labels: newbie, reliability > > There are two scenarios of offset committing that we can improve: > 1) we can filter the partitions whose committed offset is equal to the > consumed offset, meaning there is no new consumed messages from this > partition and hence we do not need to include this partition in the commit > request. > 2) we can make a commit request right after resetting to a fetch / consume > position either according to the reset policy (e.g. on consumer starting up, > or handling of out of range offset, etc), or through the {code} seek {code} > so that if the consumer fails right after these event, upon recovery it can > restarts from the reset position instead of resetting again: this can lead > to, for example, data loss if we use "largest" as reset policy while there > are new messages coming to the fetching partitions. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-1566) Kafka environment configuration (kafka-env.sh)
[ https://issues.apache.org/jira/browse/KAFKA-1566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16895901#comment-16895901 ] Sönke Liebau commented on KAFKA-1566: - Is this still something we are interested in doing? I just closed KAFKA-1234 as a duplicate of this, because it had no activity for even longer than this ticket. Personally I think this is not really necessary, as most deployments will run from service runner tools like Upstart or Systemd which provide methods for setting environment variables. At the same time, it also won't hurt and might be useful for some people, so there is no real reason not to do it. > Kafka environment configuration (kafka-env.sh) > -- > > Key: KAFKA-1566 > URL: https://issues.apache.org/jira/browse/KAFKA-1566 > Project: Kafka > Issue Type: Improvement > Components: tools >Reporter: Cosmin Lehene >Assignee: Sriharsha Chintalapani >Priority: Major > Labels: newbie, windows > Attachments: KAFKA-1566.patch, KAFKA-1566_2015-02-21_21:57:02.patch, > KAFKA-1566_2015-03-17_17:01:38.patch, KAFKA-1566_2015-03-17_17:19:23.patch > > > It would be useful (especially for automated deployments) to have an > environment configuration file that could be sourced from the launcher files > (e.g. kafka-run-server.sh). > This is how this could look like kafka-env.sh > {code} > export KAFKA_JVM_PERFORMANCE_OPTS="-XX:+UseCompressedOops > -XX:+DisableExplicitGC -Djava.awt.headless=true \ -XX:+UseG1GC > -XX:PermSize=48m -XX:MaxPermSize=48m -XX:MaxGCPauseMillis=20 > -XX:InitiatingHeapOccupancyPercent=35' %>" > export KAFKA_HEAP_OPTS="'-Xmx1G -Xms1G' %>" > export KAFKA_LOG4J_OPTS="-Dkafka.logs.dir=/var/log/kafka" > {code} > kafka-server-start.sh > {code} > ... > source $base_dir/config/kafka-env.sh > ... > {code} > This approach is consistent with Hadoop and HBase. However the idea here is > to be able to set these values in a single place without having to edit > startup scripts. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Resolved] (KAFKA-1111) Broker prematurely accepts TopicMetadataRequests on startup
[ https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sönke Liebau resolved KAFKA-. - Resolution: Abandoned Closing as abandoned after no objections on dev list. If this is indeed still an issue we can always reopen this. > Broker prematurely accepts TopicMetadataRequests on startup > --- > > Key: KAFKA- > URL: https://issues.apache.org/jira/browse/KAFKA- > Project: Kafka > Issue Type: Bug >Reporter: Jason Rosenberg >Assignee: Neha Narkhede >Priority: Major > > I have an issue where on startup, the broker starts accepting > TopicMetadataRequests before it has had metadata sync'd from the controller. > This results in a bunch of log entries that look like this: > 013-11-01 03:26:01,577 INFO [kafka-request-handler-0] admin.AdminUtils$ - > Topic creation { "partitions":{ "0":[ 9, 10 ] }, "version":1 } > 2013-11-01 03:26:07,767 INFO [kafka-request-handler-1] admin.AdminUtils$ - > Topic creation { "partitions":{ "0":[ 9, 11 ] }, "version":1 } > 2013-11-01 03:26:07,823 INFO [kafka-request-handler-1] admin.AdminUtils$ - > Topic creation { "partitions":{ "0":[ 10, 11 ] }, "version":1 } > 2013-11-01 03:26:11,183 INFO [kafka-request-handler-2] admin.AdminUtils$ - > Topic creation { "partitions":{ "0":[ 10, 11 ] }, "version":1 } > From an email thread, Neha remarks: > Before a broker receives the first > LeaderAndIsrRequest/UpdateMetadataRequest, it is technically not ready to > start serving any request. But it still ends up serving > TopicMetadataRequest which can re-create topics accidentally. It shouldn't > succeed, but this is still a problem. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Resolved] (KAFKA-1099) StopReplicaRequest and StopReplicaResponse should also carry the replica ids
[ https://issues.apache.org/jira/browse/KAFKA-1099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sönke Liebau resolved KAFKA-1099. - Resolution: Abandoned Closing this as abandoned after asking for feedback on the dev list and receiving no objections. > StopReplicaRequest and StopReplicaResponse should also carry the replica ids > > > Key: KAFKA-1099 > URL: https://issues.apache.org/jira/browse/KAFKA-1099 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.1 >Reporter: Neha Narkhede >Assignee: Neha Narkhede >Priority: Major > > The stop replica request and response only contain a list of partitions for > which a replica should be moved to offline/nonexistent state. But the replica > id information is implicit in the network layer as the receiving broker. This > complicates stop replica response handling on the controller. This blocks the > right fix for KAFKA-1097 since it requires invoking callback for processing a > StopReplicaResponse and it requires to know the replica id from the > StopReplicaResponse. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Resolved] (KAFKA-1016) Broker should limit purgatory size
[ https://issues.apache.org/jira/browse/KAFKA-1016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sönke Liebau resolved KAFKA-1016. - Resolution: Not A Problem Closing this as "not a problem", I believe the Purgatory redesign should help with the issue described here to a large extent. > Broker should limit purgatory size > -- > > Key: KAFKA-1016 > URL: https://issues.apache.org/jira/browse/KAFKA-1016 > Project: Kafka > Issue Type: Bug > Components: purgatory >Affects Versions: 0.8.0 >Reporter: Chris Riccomini >Assignee: Joel Koshy >Priority: Major > > I recently ran into a case where a poorly configured Kafka consumer was able > to trigger out of memory exceptions in multiple Kafka brokers. The consumer > was configured to have a fetcher.max.wait of Int.MaxInt. > For low volume topics, this configuration causes the consumer to block for > frequently, and for long periods of time. [~junrao] informs me that the fetch > request will time out after the socket timeout is reached. In our case, this > was set to 30s. > With several thousand consumer threads, the fetch request purgatory got into > the 100,000-400,000 range, which we believe triggered the out of memory > exception. [~nehanarkhede] claims to have seem similar behavior in other high > volume clusters. > It kind of seems like a bad thing that a poorly configured consumer can > trigger out of memory exceptions in the broker. I was thinking maybe it makes > sense to have the broker try and protect itself from this situation. Here are > some potential solutions: > 1. Have a broker-side max wait config for fetch requests. > 2. Threshold the purgatory size, and either drop the oldest connections in > purgatory, or reject the newest fetch requests when purgatory is full. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Resolved] (KAFKA-822) Reassignment of partitions needs a cleanup
[ https://issues.apache.org/jira/browse/KAFKA-822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sönke Liebau resolved KAFKA-822. Resolution: Abandoned Closing this as abandoned after asking for feedback on the dev list. Its probably also fixed, but not absolutely sure of that. > Reassignment of partitions needs a cleanup > -- > > Key: KAFKA-822 > URL: https://issues.apache.org/jira/browse/KAFKA-822 > Project: Kafka > Issue Type: Bug > Components: controller, tools >Affects Versions: 0.8.0 >Reporter: Swapnil Ghike >Assignee: Swapnil Ghike >Priority: Major > Labels: bugs > > 1. This is probably a left-over from when the ReassignPartitionsCommand used > to be blocking: > Currently, for each partition that is reassigned, controller deletes the > /admin/reassign_partitions zk path, and populates it with a new list with the > reassigned partition removed from the original list. This is probably an > overkill, and we can delete the zk path completely once the reassignment of > all partitions has completed successfully or in error. > 2. It will help to clarify that there could be no replicas that have started > and are not in the ISR when KafkaController.onPartitionReassignment() is > called. > 3. We should batch the requests in > KafkaController.StopOldReplicasOfReassignedPartition() > 4. Update controllerContext.partitionReplicaAssignment only once in > KafkaController.updateAssignedReplicasForPartition(). > 5. Need to thoroughly test. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-7376) After Kafka upgrade to v2.0.0 , Controller unable to communicate with brokers on SASL_SSL
[ https://issues.apache.org/jira/browse/KAFKA-7376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16895844#comment-16895844 ] Aldan Brito commented on KAFKA-7376: Hi [~ijuma] Even we are facing a similar issue w.r.t to SSL hostname verifications. Scenario : we have two KAfKA listeners internal and external. Internal listener is mapped to the FQDN of the Broker. eg: internal://FQDN:9092 External listener is mapped to user defined name. eg: external://testkafka:8109 while generating the SSL certificates, we have used CN name as the FQDN of the broker, and both the listener names are included in the SAN entries. when client does a handhshake with the external listener ie. broker-list config of producer set to external://testkafka:8109, we get below exceptions. {code:java} Caused by: java.security.cert.CertificateException: No name matching testkafka found at sun.security.util.HostnameChecker.matchDNS(HostnameChecker.java:231) at sun.security.util.HostnameChecker.match(HostnameChecker.java:96) at sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:455) at sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:436) at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:252) at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:136) at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1626) {code} and if we disable ssl.endpoint.algorithm for the external listener the handshake goes through fine. if we have internal and external listeners with the FQDN and generate the certificates CN as the FQDN for eg : internal://FQDN:9092 external://FQDN:8109 client does a request with broker-list config of producer set to external://FQDN:8109 works fine looks like the broker-list DNS domain name is verified against the CN name and does not consider SAN entries. decrypted server keystore snapshot: {code:java} Signature algorithm name: SHA256withRSA Subject Public Key Algorithm: 2048-bit RSA key Version: 3 Extensions: #1: ObjectId: 2.5.29.19 Criticality=false BasicConstraints:[ CA:true PathLen:2147483647 ] #2: ObjectId: 2.5.29.37 Criticality=false ExtendedKeyUsages [ serverAuth clientAuth ] #3: ObjectId: 2.5.29.15 Criticality=false KeyUsage [ DigitalSignature Non_repudiation Key_Encipherment ] #4: ObjectId: 2.5.29.17 Criticality=false SubjectAlternativeName [ DNSName: kf-mykaf-0.kf-mykaf-headless.default.svc.cluster.local DNSName: testkafka ] {code} > After Kafka upgrade to v2.0.0 , Controller unable to communicate with brokers > on SASL_SSL > - > > Key: KAFKA-7376 > URL: https://issues.apache.org/jira/browse/KAFKA-7376 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 2.0.0 >Reporter: Sridhar >Priority: Major > > Hi , > We upgraded our Kafka cluster (3x nodes running on AWS cloud) to 2.0.0 > version and enabled security with SASL_SSL (plain) encryption for > Inter-broker and Client connection . > But there are lot of errors in the controller log for the inter-broker > communication .I have the followed exactly same steps as mentioned in the > document and set all kafka brokers fqdn hostname in the SAN > (SubjectAlternativeName) of my server certificate (selfsigned) . > [http://kafka.apache.org/documentation.html#security|http://example.com/] > > openssl s_client -connect kafka-3:9093 > CONNECTED(0003) > depth=1 > Noticed someone else also facing the similar problem . > [https://github.com/confluentinc/common/issues/158] > > > {noformat} > Server Configuration : > listeners=PLAINTEXT://kafka-3:9092,SASL_SSL://kafka-3:9093 > advertised.listeners=PLAINTEXT://kafka-3:9092,SASL_SSL://kafka-3:9093 > #Security > authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer > allow.everyone.if.no.acl.found=false > security.inter.broker.protocol=SASL_SSL > sasl.mechanism.inter.broker.protocol=PLAIN > sasl.enabled.mechanisms=PLAIN > super.users=User:admin > ssl.client.auth=required > ssl.endpoint.identification.algorithm= > ssl.truststore.location=/etc/kafka/ssl/kafka.server.truststore.jks > ssl.truststore.password= > ssl.keystore.location=/etc/kafka/ssl/kafka.server.keystore.jks > ssl.keystore.password= > ssl.key.password= > #Zookeeper > zookeeper.connect=zk-1:2181,zk-2:2181,zk-3:2181 > zookeeper.connection.timeout.ms=6000 > {noformat} > > > {code:java} > > [2018-09-04 12:02:57,289] WARN [RequestSendThread controllerId=2] Controller > 2's connection to broker kafka-3:9093 (id: 3 rack: eu-central-1c) was > unsuccessful (kafka.controller.RequestSendThread) > org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake > failed > Caused by: