[jira] [Created] (KAFKA-6082) consider fencing zookeeper updates with controller epoch zkVersion
Onur Karaman created KAFKA-6082: --- Summary: consider fencing zookeeper updates with controller epoch zkVersion Key: KAFKA-6082 URL: https://issues.apache.org/jira/browse/KAFKA-6082 Project: Kafka Issue Type: Sub-task Reporter: Onur Karaman If we want, we can use multi-op to fence zookeeper updates with the controller epoch's zkVersion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6081) response error code checking
Onur Karaman created KAFKA-6081: --- Summary: response error code checking Key: KAFKA-6081 URL: https://issues.apache.org/jira/browse/KAFKA-6081 Project: Kafka Issue Type: Sub-task Reporter: Onur Karaman In most cases in the controller, we assume that requests succeed. We should instead check for their responses. Example: partition reassignment has the following todo: {code} // TODO: Eventually partition reassignment could use a callback that does retries if deletion failed {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5083) always leave the last surviving member of the ISR in ZK
[ https://issues.apache.org/jira/browse/KAFKA-5083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman resolved KAFKA-5083. - Resolution: Fixed This has been fixed in KAFKA-5642. > always leave the last surviving member of the ISR in ZK > --- > > Key: KAFKA-5083 > URL: https://issues.apache.org/jira/browse/KAFKA-5083 > Project: Kafka > Issue Type: Sub-task >Reporter: Onur Karaman >Assignee: Onur Karaman > > Currently we erase ISR membership if the replica to be removed from the ISR > is the last surviving member of the ISR and unclean leader election is > enabled for the corresponding topic. > We should investigate leaving the last replica in ISR in ZK, independent of > whether unclean leader election is enabled or not. That way, if people > re-disabled unclean leader election, we can still try to elect the leader > from the last in-sync replica. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6065) Add zookeeper metrics to ZookeeperClient as in KIP-188
Onur Karaman created KAFKA-6065: --- Summary: Add zookeeper metrics to ZookeeperClient as in KIP-188 Key: KAFKA-6065 URL: https://issues.apache.org/jira/browse/KAFKA-6065 Project: Kafka Issue Type: Sub-task Reporter: Onur Karaman Among other things, KIP-188 added latency metrics to ZkUtils. We should add the same metrics to ZookeeperClient. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6014) new consumer mirror maker halts after committing offsets to a deleted topic
Onur Karaman created KAFKA-6014: --- Summary: new consumer mirror maker halts after committing offsets to a deleted topic Key: KAFKA-6014 URL: https://issues.apache.org/jira/browse/KAFKA-6014 Project: Kafka Issue Type: Bug Reporter: Onur Karaman New consumer throws an unexpected KafkaException when trying to commit to a topic that has been deleted. MirrorMaker.commitOffsets doesn't attempt to catch the KafkaException and just kills the process. We didn't see this in the old consumer because old consumer just silently drops failed offset commits. I ran a quick experiment locally to prove the behavior. The experiment: 1. start up a single broker 2. create a single-partition topic t 3. create a new consumer that consumes topic t 4. make the consumer commit every few seconds 5. delete topic t 6. expect: KafkaException that kills the process. Here's my script: {code} package org.apache.kafka.clients.consumer; import org.apache.kafka.common.TopicPartition; import java.util.Collections; import java.util.List; import java.util.Properties; public class OffsetCommitTopicDeletionTest { public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9090"); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "g"); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); KafkaConsumerkafkaConsumer = new KafkaConsumer<>(props); TopicPartition partition = new TopicPartition("t", 0); List partitions = Collections.singletonList(partition); kafkaConsumer.assign(partitions); while (true) { kafkaConsumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(0, ""))); Thread.sleep(1000); } } } {code} Here are the other commands: {code} > rm -rf /tmp/zookeeper/ /tmp/kafka-logs* logs* > ./gradlew clean jar > ./bin/zookeeper-server-start.sh config/zookeeper.properties > export LOG_DIR=logs0 && ./bin/kafka-server-start.sh config/server0.properties > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t > --partitions 1 --replication-factor 1 > ./bin/kafka-run-class.sh > org.apache.kafka.clients.consumer.OffsetCommitTopicDeletionTest > ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic t {code} Here is the output: {code} [2017-10-04 20:00:14,451] ERROR [Consumer clientId=consumer-1, groupId=g] Offset commit failed on partition t-0 at offset 0: This server does not host this topic-partition. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) Exception in thread "main" org.apache.kafka.common.KafkaException: Partition t-0 may not exist or user may not have Describe access to topic at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:789) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:734) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:506) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:268) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:190) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:600) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1231) at org.apache.kafka.clients.consumer.OffsetCommitTopicDeletionTest.main(OffsetCommitTopicDeletionTest.java:22)
[jira] [Created] (KAFKA-5894) add the notion of max inflight requests to async ZookeeperClient
Onur Karaman created KAFKA-5894: --- Summary: add the notion of max inflight requests to async ZookeeperClient Key: KAFKA-5894 URL: https://issues.apache.org/jira/browse/KAFKA-5894 Project: Kafka Issue Type: Sub-task Reporter: Onur Karaman Assignee: Onur Karaman ZookeeperClient is a zookeeper client that encourages pipelined requests to zookeeper. We want to add the notion of max inflight requests to the client for several reasons: # to bound memory overhead associated with async requests on the client. # to not overwhelm the zookeeper ensemble with a burst of requests. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-4747) add metrics for KafkaConsumer.poll
[ https://issues.apache.org/jira/browse/KAFKA-4747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman resolved KAFKA-4747. - Resolution: Won't Fix [~junrao] pointed out that the distinction between tim-in-poll and time-in-application can be effectively computed as 1 - (io-ratio) - (io-wait-ratio). If this value is close to 1, then time is mostly being spent on the application-side. Otherwise if this value is close to 0, then time is mostly being spent on the client-side. Here's a simple experiment I ran to verify: {code} /** * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ package org.apache.kafka.clients.consumer; import org.apache.kafka.common.TopicPartition; import java.util.Collections; import java.util.Properties; public class SlowKafkaConsumer { public static void main(String[] args) throws InterruptedException { long pollTimeout = Long.valueOf(args[0]); long sleepDuration = Long.valueOf(args[1]); Properties props = new Properties(); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9090"); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "onur"); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); KafkaConsumerkafkaConsumer = new KafkaConsumer<>(props); kafkaConsumer.assign(Collections.singletonList(new TopicPartition("t", 0))); kafkaConsumer.seekToBeginning(Collections.singletonList(new TopicPartition("t", 0))); while (true) { kafkaConsumer.poll(pollTimeout); Thread.sleep(sleepDuration); } } } {code} {code} no data === > ./bin/kafka-run-class.sh org.apache.kafka.clients.consumer.SlowKafkaConsumer > 2000 0 io-ratio ~ 0 io-wait-ratio ~ 0.99 > ./bin/kafka-run-class.sh org.apache.kafka.clients.consumer.SlowKafkaConsumer > 2000 1 io-ratio ~ 0 io-wait-ratio ~ [0.1, 0.2] > ./bin/kafka-run-class.sh org.apache.kafka.clients.consumer.SlowKafkaConsumer > 2000 2 io-ratio ~ 0 io-wait-ratio ~ [0.05, 0.12] with data = > ./bin/kafka-producer-perf-test.sh --producer-props > bootstrap.servers=localhost:9090 --topic t --throughput -1 --num-records > 1 --record-size 1000 > ./bin/kafka-run-class.sh org.apache.kafka.clients.consumer.SlowKafkaConsumer > 2000 0 io-ratio ~ 0.06 io-wait-ratio ~ 0.8 > ./bin/kafka-run-class.sh org.apache.kafka.clients.consumer.SlowKafkaConsumer > 2000 1 io-ratio ~ 0 io-wait-ratio ~ [0.05, 0.1] > ./bin/kafka-run-class.sh org.apache.kafka.clients.consumer.SlowKafkaConsumer > 2000 2 io-ratio ~ 0 io-wait-ratio ~ [0, 0.03] {code} > add metrics for KafkaConsumer.poll > -- > > Key: KAFKA-4747 > URL: https://issues.apache.org/jira/browse/KAFKA-4747 > Project: Kafka > Issue Type: Improvement >Reporter: Onur Karaman >Assignee: Onur Karaman > > KafkaConsumer heavily depends on KafkaConsumer.poll yet we don't have metrics > directly associated with it. > We probably want to add two metrics: > 1. time spent in KafkaConsumer.poll > 2. time since last KafkaConsumer.poll (measured as now - endTimeOfLastPoll) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5703) allow debug-level logging for RequestChannel's request logger
[ https://issues.apache.org/jira/browse/KAFKA-5703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman resolved KAFKA-5703. - Resolution: Fixed Woops. Looks like [~ijuma] already fixed this 3 days ago: 4086db472d08f6dee4d30dda82ab9ff7b67d1a20 > allow debug-level logging for RequestChannel's request logger > - > > Key: KAFKA-5703 > URL: https://issues.apache.org/jira/browse/KAFKA-5703 > Project: Kafka > Issue Type: Bug >Reporter: Onur Karaman >Assignee: Onur Karaman > > Git hash d25671884bbbdf7843ada3e7797573a00ac7cd56 introduced a bug in > RequestChannel's request logger that causes debug-level logging to never > occur. > {code} > - if (requestLogger.isTraceEnabled) > -requestLogger.trace("Completed request:%s from connection > %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d,securityProtocol:%s,principal:%s" > - .format(requestDesc(true), connectionId, totalTime, > requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, > responseSendTime, securityProtocol, session.principal)) > - else if (requestLogger.isDebugEnabled) > -requestLogger.debug("Completed request:%s from connection > %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d,securityProtocol:%s,principal:%s" > - .format(requestDesc(false), connectionId, totalTime, > requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, > responseSendTime, securityProtocol, session.principal)) > + if (requestLogger.isDebugEnabled) { > +val detailsEnabled = requestLogger.isTraceEnabled > +requestLogger.trace("Completed request:%s from connection > %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d,securityProtocol:%s,principal:%s,listener:%s" > + .format(requestDesc(detailsEnabled), connectionId, totalTime, > requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, > responseSendTime, securityProtocol, session.principal, listenerName.value)) > + } > {code} > So trace-level logging is used even if debug-level logging is specified, > causing users to not see the non-detailed request logs. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5703) allow debug-level logging for RequestChannel's request logger
Onur Karaman created KAFKA-5703: --- Summary: allow debug-level logging for RequestChannel's request logger Key: KAFKA-5703 URL: https://issues.apache.org/jira/browse/KAFKA-5703 Project: Kafka Issue Type: Bug Reporter: Onur Karaman Assignee: Onur Karaman Git hash d25671884bbbdf7843ada3e7797573a00ac7cd56 introduced a bug in RequestChannel's request logger that causes debug-level logging to never occur. {code} - if (requestLogger.isTraceEnabled) -requestLogger.trace("Completed request:%s from connection %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d,securityProtocol:%s,principal:%s" - .format(requestDesc(true), connectionId, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime, securityProtocol, session.principal)) - else if (requestLogger.isDebugEnabled) -requestLogger.debug("Completed request:%s from connection %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d,securityProtocol:%s,principal:%s" - .format(requestDesc(false), connectionId, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime, securityProtocol, session.principal)) + if (requestLogger.isDebugEnabled) { +val detailsEnabled = requestLogger.isTraceEnabled +requestLogger.trace("Completed request:%s from connection %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d,securityProtocol:%s,principal:%s,listener:%s" + .format(requestDesc(detailsEnabled), connectionId, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime, securityProtocol, session.principal, listenerName.value)) + } {code} So trace-level logging is used even if debug-level logging is specified, causing users to not see the non-detailed request logs. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5501) introduce async ZookeeperClient
[ https://issues.apache.org/jira/browse/KAFKA-5501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman resolved KAFKA-5501. - Resolution: Fixed > introduce async ZookeeperClient > --- > > Key: KAFKA-5501 > URL: https://issues.apache.org/jira/browse/KAFKA-5501 > Project: Kafka > Issue Type: Sub-task >Reporter: Onur Karaman >Assignee: Onur Karaman > Fix For: 1.0.0 > > > Synchronous zookeeper apis means that we wait an entire round trip before > doing the next operation. We should introduce a zookeeper client that > encourages pipelined requests to zookeeper. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5642) use async ZookeeperClient everywhere
Onur Karaman created KAFKA-5642: --- Summary: use async ZookeeperClient everywhere Key: KAFKA-5642 URL: https://issues.apache.org/jira/browse/KAFKA-5642 Project: Kafka Issue Type: Sub-task Reporter: Onur Karaman Assignee: Onur Karaman Synchronous zookeeper writes means that we wait an entire round trip before doing the next write. These synchronous writes are happening at a per-partition granularity in several places, so partition-heavy clusters suffer from the controller doing many sequential round trips to zookeeper. * PartitionStateMachine.electLeaderForPartition updates leaderAndIsr in zookeeper on transition to OnlinePartition. This gets triggered per-partition sequentially with synchronous writes during controlled shutdown of the shutting down broker's replicas for which it is the leader. * ReplicaStateMachine updates leaderAndIsr in zookeeper on transition to OfflineReplica when calling KafkaController.removeReplicaFromIsr. This gets triggered per-partition sequentially with synchronous writes for failed or controlled shutdown brokers. KAFKA-5501 introduced an async ZookeeperClient that encourages pipelined requests to zookeeper. We should replace ZkClient's usage with this client. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5502) read current brokers from zookeeper upon processing broker change
Onur Karaman created KAFKA-5502: --- Summary: read current brokers from zookeeper upon processing broker change Key: KAFKA-5502 URL: https://issues.apache.org/jira/browse/KAFKA-5502 Project: Kafka Issue Type: Sub-task Reporter: Onur Karaman Assignee: Onur Karaman [~lindong]'s testing of the 0.11.0 release revealed a controller-side performance regression in clusters with many brokers and many partitions when bringing up many brokers simultaneously. The regerssion is caused by KAFKA-5028: a Watcher receives WatchedEvent notifications from the raw ZooKeeper client EventThread. A WatchedEvent only contains the following information: - KeeperState - EventType - path Note that it does not actually contain the current data or current set of children associated with the data/child change notification. It is up to the user to do this lookup to see the current data or set of children. ZkClient is itself a Watcher. When it receives a WatchedEvent, it puts a ZkEvent into its own queue which its own ZkEventThread processes. Users of ZkClient interact with these notifications through listeners (IZkDataListener, IZkChildListener). IZkDataListener actually expects as input the current data of the watched znode, and likewise IZkChildListener actually expects as input the current set of children of the watched znode. In order to provide this information to the listeners, the ZkEventThread, when processing the ZkEvent in its queue, looks up the information (either the current data or current set of children) simultaneously sets up the next watch, and passes the result to the listener. The regression introduced in KAFKA-5028 is the time at which we lookup the information needed for the event processing. In the past, the lookup from the ZkEventThread during ZkEvent processing would be passed into the listener which is processed immediately after. For instance in ZkClient.fireChildChangedEvents: {code} List children = getChildren(path); listener.handleChildChange(path, children); {code} Now, however, there are multiple listeners that pass information looked up by the ZkEventThread into a ControllerEvent which gets processed potentially much later. For instance in BrokerChangeListener: {code} class BrokerChangeListener(controller: KafkaController) extends IZkChildListener with Logging { override def handleChildChange(parentPath: String, currentChilds: java.util.List[String]): Unit = { import JavaConverters._ controller.addToControllerEventQueue(controller.BrokerChange(currentChilds.asScala)) } } {code} In terms of impact, this: - increases the odds of working with stale information by the time the ControllerEvent gets processed. - can cause the cluster to take a long time to stabilize if you bring up many brokers simultaneously. In terms of how to solve it: - (short term) just ignore the ZkClient's information lookup and repeat the lookup at the start of the ControllerEvent. This is the approach taken in this ticket. - (long term) try to remove a queue. This basically means getting rid of ZkClient. This is likely the approach that will be taken in KAFKA-5501. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5501) use async zookeeper apis everywhere
Onur Karaman created KAFKA-5501: --- Summary: use async zookeeper apis everywhere Key: KAFKA-5501 URL: https://issues.apache.org/jira/browse/KAFKA-5501 Project: Kafka Issue Type: Sub-task Reporter: Onur Karaman Assignee: Onur Karaman Synchronous zookeeper writes means that we wait an entire round trip before doing the next write. These synchronous writes are happening at a per-partition granularity in several places, so partition-heavy clusters suffer from the controller doing many sequential round trips to zookeeper. * PartitionStateMachine.electLeaderForPartition updates leaderAndIsr in zookeeper on transition to OnlinePartition. This gets triggered per-partition sequentially with synchronous writes during controlled shutdown of the shutting down broker's replicas for which it is the leader. * ReplicaStateMachine updates leaderAndIsr in zookeeper on transition to OfflineReplica when calling KafkaController.removeReplicaFromIsr. This gets triggered per-partition sequentially with synchronous writes for failed or controlled shutdown brokers. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4595) Controller send thread can't stop when broker change listener event trigger for dead brokers
[ https://issues.apache.org/jira/browse/KAFKA-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16035465#comment-16035465 ] Onur Karaman commented on KAFKA-4595: - [~ijuma] Yeah I think KAFKA-5028 solved this issue. > Controller send thread can't stop when broker change listener event trigger > for dead brokers > - > > Key: KAFKA-4595 > URL: https://issues.apache.org/jira/browse/KAFKA-4595 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.0, 0.10.1.1 >Reporter: Pengwei >Priority: Critical > Labels: reliability > Fix For: 0.11.0.1 > > > In our test env, we found controller is not working after a delete topic > opertation and network issue, the stack is below: > "ZkClient-EventThread-15-192.168.1.3:2184,192.168.1.4:2184,192.168.1.5:2184" > #15 daemon prio=5 os_prio=0 tid=0x7fb76416e000 nid=0x3019 waiting on > condition [0x7fb76b7c8000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xc05497b8> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > kafka.utils.ShutdownableThread.awaitShutdown(ShutdownableThread.scala:50) > at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:32) > at > kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$removeExistingBroker(ControllerChannelManager.scala:128) > at > kafka.controller.ControllerChannelManager.removeBroker(ControllerChannelManager.scala:81) > - locked <0xc0258760> (a java.lang.Object) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply$mcVI$sp(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369) > at scala.collection.immutable.Set$Set1.foreach(Set.scala:79) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356) > at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) >Locked ownable synchronizers: > - <0xc02587f8> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > "Controller-1001-to-broker-1003-send-thread" #88 prio=5 os_prio=0 > tid=0x7fb778342000 nid=0x5a4c waiting on condition [0x7fb761de] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xc02587f8> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at >
[jira] [Commented] (KAFKA-1595) Remove deprecated and slower scala JSON parser from kafka.consumer.TopicCount
[ https://issues.apache.org/jira/browse/KAFKA-1595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16035365#comment-16035365 ] Onur Karaman commented on KAFKA-1595: - Sounds good. > Remove deprecated and slower scala JSON parser from kafka.consumer.TopicCount > - > > Key: KAFKA-1595 > URL: https://issues.apache.org/jira/browse/KAFKA-1595 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.8.1.1 >Reporter: Jagbir >Assignee: Ismael Juma > Labels: newbie > > The following issue is created as a follow up suggested by Jun Rao > in a kafka news group message with the Subject > "Blocking Recursive parsing from > kafka.consumer.TopicCount$.constructTopicCount" > SUMMARY: > An issue was detected in a typical cluster of 3 kafka instances backed > by 3 zookeeper instances (kafka version 0.8.1.1, scala version 2.10.3, > java version 1.7.0_65). On consumer end, when consumers get recycled, > there is a troubling JSON parsing recursion which takes a busy lock and > blocks consumers thread pool. > In 0.8.1.1 scala client library ZookeeperConsumerConnector.scala:355 takes > a global lock (0xd3a7e1d0) during the rebalance, and fires an > expensive JSON parsing, while keeping the other consumers from shutting > down, see, e.g, > at > kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:161) > The deep recursive JSON parsing should be deprecated in favor > of a better JSON parser, see, e.g, > http://engineering.ooyala.com/blog/comparing-scala-json-libraries? > DETAILS: > The first dump is for a recursive blocking thread holding the lock for > 0xd3a7e1d0 > and the subsequent dump is for a waiting thread. > (Please grep for 0xd3a7e1d0 to see the locked object.) > Â > -8<- > "Sa863f22b1e5hjh6788991800900b34545c_profile-a-prod1-s-140789080845312-c397945e8_watcher_executor" > prio=10 tid=0x7f24dc285800 nid=0xda9 runnable [0x7f249e40b000] > java.lang.Thread.State: RUNNABLE > at > scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.p$7(Parsers.scala:722) > at > scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.continue$1(Parsers.scala:726) > at > scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:737) > at > scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:721) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at >
[jira] [Comment Edited] (KAFKA-1595) Remove deprecated and slower scala JSON parser from kafka.consumer.TopicCount
[ https://issues.apache.org/jira/browse/KAFKA-1595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16032276#comment-16032276 ] Onur Karaman edited comment on KAFKA-1595 at 6/1/17 12:50 AM: -- I think [~ijuma]'s PR (https://github.com/apache/kafka/pull/83) is worth reinvestigating as a means of improving the controller (especially with respect to controller initialization time). I can help out if needed. was (Author: onurkaraman): I think [~ijuma]'s PR (https://github.com/apache/kafka/pull/83) is worth reinvestigating as a means of improving the controller (especially with respect to controller initialization time). > Remove deprecated and slower scala JSON parser from kafka.consumer.TopicCount > - > > Key: KAFKA-1595 > URL: https://issues.apache.org/jira/browse/KAFKA-1595 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.8.1.1 >Reporter: Jagbir >Assignee: Ismael Juma > Labels: newbie > > The following issue is created as a follow up suggested by Jun Rao > in a kafka news group message with the Subject > "Blocking Recursive parsing from > kafka.consumer.TopicCount$.constructTopicCount" > SUMMARY: > An issue was detected in a typical cluster of 3 kafka instances backed > by 3 zookeeper instances (kafka version 0.8.1.1, scala version 2.10.3, > java version 1.7.0_65). On consumer end, when consumers get recycled, > there is a troubling JSON parsing recursion which takes a busy lock and > blocks consumers thread pool. > In 0.8.1.1 scala client library ZookeeperConsumerConnector.scala:355 takes > a global lock (0xd3a7e1d0) during the rebalance, and fires an > expensive JSON parsing, while keeping the other consumers from shutting > down, see, e.g, > at > kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:161) > The deep recursive JSON parsing should be deprecated in favor > of a better JSON parser, see, e.g, > http://engineering.ooyala.com/blog/comparing-scala-json-libraries? > DETAILS: > The first dump is for a recursive blocking thread holding the lock for > 0xd3a7e1d0 > and the subsequent dump is for a waiting thread. > (Please grep for 0xd3a7e1d0 to see the locked object.) > Â > -8<- > "Sa863f22b1e5hjh6788991800900b34545c_profile-a-prod1-s-140789080845312-c397945e8_watcher_executor" > prio=10 tid=0x7f24dc285800 nid=0xda9 runnable [0x7f249e40b000] > java.lang.Thread.State: RUNNABLE > at > scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.p$7(Parsers.scala:722) > at > scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.continue$1(Parsers.scala:726) > at > scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:737) > at > scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:721) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at >
[jira] [Commented] (KAFKA-1595) Remove deprecated and slower scala JSON parser from kafka.consumer.TopicCount
[ https://issues.apache.org/jira/browse/KAFKA-1595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16032276#comment-16032276 ] Onur Karaman commented on KAFKA-1595: - I think [~ijuma]'s PR (https://github.com/apache/kafka/pull/83) is worth reinvestigating as a means of improving the controller (especially with respect to controller initialization time). > Remove deprecated and slower scala JSON parser from kafka.consumer.TopicCount > - > > Key: KAFKA-1595 > URL: https://issues.apache.org/jira/browse/KAFKA-1595 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.8.1.1 >Reporter: Jagbir >Assignee: Ismael Juma > Labels: newbie > > The following issue is created as a follow up suggested by Jun Rao > in a kafka news group message with the Subject > "Blocking Recursive parsing from > kafka.consumer.TopicCount$.constructTopicCount" > SUMMARY: > An issue was detected in a typical cluster of 3 kafka instances backed > by 3 zookeeper instances (kafka version 0.8.1.1, scala version 2.10.3, > java version 1.7.0_65). On consumer end, when consumers get recycled, > there is a troubling JSON parsing recursion which takes a busy lock and > blocks consumers thread pool. > In 0.8.1.1 scala client library ZookeeperConsumerConnector.scala:355 takes > a global lock (0xd3a7e1d0) during the rebalance, and fires an > expensive JSON parsing, while keeping the other consumers from shutting > down, see, e.g, > at > kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:161) > The deep recursive JSON parsing should be deprecated in favor > of a better JSON parser, see, e.g, > http://engineering.ooyala.com/blog/comparing-scala-json-libraries? > DETAILS: > The first dump is for a recursive blocking thread holding the lock for > 0xd3a7e1d0 > and the subsequent dump is for a waiting thread. > (Please grep for 0xd3a7e1d0 to see the locked object.) > Â > -8<- > "Sa863f22b1e5hjh6788991800900b34545c_profile-a-prod1-s-140789080845312-c397945e8_watcher_executor" > prio=10 tid=0x7f24dc285800 nid=0xda9 runnable [0x7f249e40b000] > java.lang.Thread.State: RUNNABLE > at > scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.p$7(Parsers.scala:722) > at > scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.continue$1(Parsers.scala:726) > at > scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:737) > at > scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:721) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) > at > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) > at >
[jira] [Commented] (KAFKA-5328) consider switching json parser from scala to jackson
[ https://issues.apache.org/jira/browse/KAFKA-5328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027086#comment-16027086 ] Onur Karaman commented on KAFKA-5328: - I copied the topic assignment znodes and partition state znodes from a cluster into two text files and just timed how long it took to parse the topic and partition znodes using both the scala json parser and jackson. This experiment had 3066 topics and 95895 partitions (the actual cluster I got the znodes from was actually a third the size but I just duplicated lines to reflect some of our other clusters). Here's the code: {code} /** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * *http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package kafka.tools import java.nio.file.{Files, Paths} import com.fasterxml.jackson.databind.ObjectMapper import kafka.api.LeaderAndIsr import kafka.common.TopicAndPartition import kafka.controller.LeaderIsrAndControllerEpoch import kafka.utils.Json import scala.collection.JavaConverters._ import scala.collection.Seq object JsonPerformance { def main(args: Array[String]): Unit = { val assignments = Files.readAllLines(Paths.get("/Users/okaraman/code/read-zk-state/raw_assignments_big.txt")) val states = Files.readAllLines(Paths.get("/Users/okaraman/code/read-zk-state/raw_states_big.txt")) time(assignments.asScala.foreach(parseAssignmentWithScala), "scala json parser assignments") time(assignments.asScala.foreach(parseAssignmentWithJackson), "jackson json parser assignments") time(states.asScala.foreach(parseStateWithScala), "scala json parser states") time(states.asScala.foreach(parseStateWithJackson), "jackson json parser states") } def time(f: => Unit, messagePrefix: String): Unit = { val start = System.currentTimeMillis() f val end = System.currentTimeMillis() println(messagePrefix + s" took: ${end - start} ms") } def parseAssignmentWithScala(assignment: String): Map[TopicAndPartition, Seq[Int]] = { val json = Json.parseFull(assignment).get val jsonPartitions = json.asInstanceOf[Map[String, Any]].get("partitions").get val replicaMap = jsonPartitions.asInstanceOf[Map[String, Seq[Int]]] replicaMap.map { case (partition, replicas) => TopicAndPartition("t", partition.toInt) -> replicas } } def parseAssignmentWithJackson(assignment: String): Map[TopicAndPartition, Seq[Int]] = { val mapper = new ObjectMapper() val json = mapper.readTree(assignment) json.get("partitions").asScala.toList.flatMap(x => (0 until x.size()).map(i => TopicAndPartition("t", i) -> x.get(i).asScala.toSeq.map(_.asInt(.toMap } def parseStateWithScala(state: String): LeaderIsrAndControllerEpoch = { val json = Json.parseFull(state).get val leaderIsrAndEpochInfo = json.asInstanceOf[Map[String, Any]] val leader = leaderIsrAndEpochInfo.get("leader").get.asInstanceOf[Int] val epoch = leaderIsrAndEpochInfo.get("leader_epoch").get.asInstanceOf[Int] val isr = leaderIsrAndEpochInfo.get("isr").get.asInstanceOf[List[Int]] val controllerEpoch = leaderIsrAndEpochInfo.get("controller_epoch").get.asInstanceOf[Int] LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, 1), controllerEpoch) } def parseStateWithJackson(state: String): LeaderIsrAndControllerEpoch = { val mapper = new ObjectMapper() val json = mapper.readTree(state) val leader = json.get("leader").asInt() val epoch = json.get("leader_epoch").asInt() val isr = json.get("isr").asScala.toList.map(_.asInt()) val controllerEpoch = json.get("controller_epoch").asInt() LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, 1), controllerEpoch) } } {code} Here's the output: {code} scala json parser assignments took: 8730 ms jackson json parser assignments took: 354 ms scala json parser states took: 28395 ms jackson json parser states took: 360 ms {code} So controller initialization time spent on json parsing would be reduced from 37.1 seconds down to 0.7 seconds. > consider switching json parser from scala to jackson > > > Key: KAFKA-5328 > URL:
[jira] [Created] (KAFKA-5328) consider switching json parser from scala to jackson
Onur Karaman created KAFKA-5328: --- Summary: consider switching json parser from scala to jackson Key: KAFKA-5328 URL: https://issues.apache.org/jira/browse/KAFKA-5328 Project: Kafka Issue Type: Sub-task Reporter: Onur Karaman Assignee: Onur Karaman The scala json parser is significantly slower than jackson. This can have a nontrivial impact on controller initialization since the controller loads and parses almost all zookeeper state. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (KAFKA-5323) AdminUtils.createTopic should check topic existence upfront
[ https://issues.apache.org/jira/browse/KAFKA-5323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman updated KAFKA-5323: Status: Patch Available (was: Open) > AdminUtils.createTopic should check topic existence upfront > --- > > Key: KAFKA-5323 > URL: https://issues.apache.org/jira/browse/KAFKA-5323 > Project: Kafka > Issue Type: Improvement >Reporter: Onur Karaman >Assignee: Onur Karaman > > When a topic exists, AdminUtils.createTopic unnecessarily does N+2 zookeeper > reads where N is the number of brokers. Here is the breakdown of the N+2 > zookeeper reads: > # reads the current list of brokers in zookeeper (1 zookeeper read) > # reads metadata for each broker in zookeeper (N zookeeper reads where N is > the number of brokers) > # checks for topic existence in zookeeper (1 zookeeper read) > We can reduce the N+2 reads down to 1 by checking topic existence upfront. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-5323) AdminUtils.createTopic should check topic existence upfront
[ https://issues.apache.org/jira/browse/KAFKA-5323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16024223#comment-16024223 ] Onur Karaman commented on KAFKA-5323: - This can have a larger impact than one might initially suspect. For instance, a broker only populates its MetadataCache after it has joined the cluster and the controller sends it an UpdateMetadataRequest. But a broker can begin processing requests even before registering itself in zookeeper (before the controller even knows the broker is alive). In other words, a broker can begin processing MetadataRequests before processing the controller's UpdateMetadataRequest following broker registration. Processing these MetadataRequests in this scenario leads to large local times and can cause substantial request queue backup, causing significant delays in the broker processing its initial UpdateMetadataRequest. Since the broker hasn't received any UpdateMetadataRequest from the controller yet, its MetadataCache is empty. So the topics from all the client MetadataRequests are treated as brand new topics, which means the broker tries to auto create these topics. For each pre-existing topic queried in the MetadataRequest, auto topic creation performs the N+2 zookeeper reads mentioned earlier. In one bad production scenario (while recovering from KAFKA-4959), this caused a significant delay in bringing replicas online, as both the initial LeaderAndIsrRequest and UpdateMetadataRequest from the controller on broker startup was stuck behind these client MetadataRequests hammering zookeeper. > AdminUtils.createTopic should check topic existence upfront > --- > > Key: KAFKA-5323 > URL: https://issues.apache.org/jira/browse/KAFKA-5323 > Project: Kafka > Issue Type: Improvement >Reporter: Onur Karaman >Assignee: Onur Karaman > > When a topic exists, AdminUtils.createTopic unnecessarily does N+2 zookeeper > reads where N is the number of brokers. Here is the breakdown of the N+2 > zookeeper reads: > # reads the current list of brokers in zookeeper (1 zookeeper read) > # reads metadata for each broker in zookeeper (N zookeeper reads where N is > the number of brokers) > # checks for topic existence in zookeeper (1 zookeeper read) > We can reduce the N+2 reads down to 1 by checking topic existence upfront. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-5323) AdminUtils.createTopic should check topic existence upfront
Onur Karaman created KAFKA-5323: --- Summary: AdminUtils.createTopic should check topic existence upfront Key: KAFKA-5323 URL: https://issues.apache.org/jira/browse/KAFKA-5323 Project: Kafka Issue Type: Improvement Reporter: Onur Karaman Assignee: Onur Karaman When a topic exists, AdminUtils.createTopic unnecessarily does N+2 zookeeper reads where N is the number of brokers. Here is the breakdown of the N+2 zookeeper reads: # reads the current list of brokers in zookeeper (1 zookeeper read) # reads metadata for each broker in zookeeper (N zookeeper reads where N is the number of brokers) # checks for topic existence in zookeeper (1 zookeeper read) We can reduce the N+2 reads down to 1 by checking topic existence upfront. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (KAFKA-5310) reset ControllerContext during resignation
[ https://issues.apache.org/jira/browse/KAFKA-5310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman updated KAFKA-5310: Status: Patch Available (was: Open) > reset ControllerContext during resignation > -- > > Key: KAFKA-5310 > URL: https://issues.apache.org/jira/browse/KAFKA-5310 > Project: Kafka > Issue Type: Sub-task >Reporter: Onur Karaman >Assignee: Onur Karaman > > This ticket is all about ControllerContext initialization and teardown. The > key points are: > 1. we should teardown ControllerContext during resignation instead of waiting > on election to fix it up. A heapdump shows that the former controller keeps > pretty much all of its ControllerContext state laying around. > 2. we don't properly teardown/reset > {{ControllerContext.partitionsBeingReassigned}}. This caused problems for us > in a production cluster at linkedin as shown in the scenario below: > {code} > > rm -rf /tmp/zookeeper/ /tmp/kafka-logs* logs* > > ./gradlew clean jar > > ./bin/zookeeper-server-start.sh config/zookeeper.properties > > export LOG_DIR=logs0 && ./bin/kafka-server-start.sh > > config/server0.properties > > export LOG_DIR=logs1 && ./bin/kafka-server-start.sh > > config/server1.properties > > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t > > --replica-assignment 1 > > ./bin/zookeeper-shell.sh localhost:2181 > get /brokers/topics/t > {"version":1,"partitions":{"0":[1]}} > create /admin/reassign_partitions > {"partitions":[{"topic":"t","partition":0,"replicas":[1,2]}],"version":1} > Created /admin/reassign_partitions > get /brokers/topics/t > {"version":1,"partitions":{"0":[1,2]}} > get /admin/reassign_partitions > {"version":1,"partitions":[{"topic":"t","partition":0,"replicas":[1,2]}]} > delete /admin/reassign_partitions > delete /controller > get /brokers/topics/t > {"version":1,"partitions":{"0":[1,2]}} > get /admin/reassign_partitions > Node does not exist: /admin/reassign_partitions > > echo > > '{"partitions":[{"topic":"t","partition":0,"replicas":[1]}],"version":1}' > > > reassignment.txt > > ./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 > > --reassignment-json-file reassignment.txt --execute > get /brokers/topics/t > {"version":1,"partitions":{"0":[1]}} > get /admin/reassign_partitions > Node does not exist: /admin/reassign_partitions > delete /controller > get /brokers/topics/t > {"version":1,"partitions":{"0":[1,2]}} > get /admin/reassign_partitions > Node does not exist: /admin/reassign_partitions > {code} > Notice that the replica set goes from \[1\] to \[1,2\] (as expected with the > explicit {{/admin/reassign_partitions}} znode creation during the initial > controller) back to \[1\] (as expected with the partition reassignment during > the second controller) and again back to \[1,2\] after the original > controller gets re-elected. > That last transition from \[1\] to \[1,2\] is unexpected. It's due to the > original controller not resetting its > {{ControllerContext.partitionsBeingReassigned}} correctly. > {{initializePartitionReassignment}} simply adds to what's already in > {{ControllerContext.partitionsBeingReassigned}}. > The explicit {{/admin/reassign_partitions}} znode creation is to circumvent > KAFKA-5161 (95b48b157aca44beec4335e62a59f37097fe7499). Doing so is valid > since: > 1. our code in production doesn't have that change > 2. KAFKA-5161 doesn't address the underlying race condition between a broker > failure and the ReassignPartitionsCommand tool creating the znode. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (KAFKA-5310) reset ControllerContext during resignation
[ https://issues.apache.org/jira/browse/KAFKA-5310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman updated KAFKA-5310: Description: This ticket is all about ControllerContext initialization and teardown. The key points are: 1. we should teardown ControllerContext during resignation instead of waiting on election to fix it up. A heapdump shows that the former controller keeps pretty much all of its ControllerContext state laying around. 2. we don't properly teardown/reset {{ControllerContext.partitionsBeingReassigned}}. This caused problems for us in a production cluster at linkedin as shown in the scenario below: {code} > rm -rf /tmp/zookeeper/ /tmp/kafka-logs* logs* > ./gradlew clean jar > ./bin/zookeeper-server-start.sh config/zookeeper.properties > export LOG_DIR=logs0 && ./bin/kafka-server-start.sh config/server0.properties > export LOG_DIR=logs1 && ./bin/kafka-server-start.sh config/server1.properties > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t > --replica-assignment 1 > ./bin/zookeeper-shell.sh localhost:2181 get /brokers/topics/t {"version":1,"partitions":{"0":[1]}} create /admin/reassign_partitions {"partitions":[{"topic":"t","partition":0,"replicas":[1,2]}],"version":1} Created /admin/reassign_partitions get /brokers/topics/t {"version":1,"partitions":{"0":[1,2]}} get /admin/reassign_partitions {"version":1,"partitions":[{"topic":"t","partition":0,"replicas":[1,2]}]} delete /admin/reassign_partitions delete /controller get /brokers/topics/t {"version":1,"partitions":{"0":[1,2]}} get /admin/reassign_partitions Node does not exist: /admin/reassign_partitions > echo > '{"partitions":[{"topic":"t","partition":0,"replicas":[1]}],"version":1}' > > reassignment.txt > ./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 > --reassignment-json-file reassignment.txt --execute get /brokers/topics/t {"version":1,"partitions":{"0":[1]}} get /admin/reassign_partitions Node does not exist: /admin/reassign_partitions delete /controller get /brokers/topics/t {"version":1,"partitions":{"0":[1,2]}} get /admin/reassign_partitions Node does not exist: /admin/reassign_partitions {code} Notice that the replica set goes from \[1\] to \[1,2\] (as expected with the explicit {{/admin/reassign_partitions}} znode creation during the initial controller) back to \[1\] (as expected with the partition reassignment during the second controller) and again back to \[1,2\] after the original controller gets re-elected. That last transition from \[1\] to \[1,2\] is unexpected. It's due to the original controller not resetting its {{ControllerContext.partitionsBeingReassigned}} correctly. {{initializePartitionReassignment}} simply adds to what's already in {{ControllerContext.partitionsBeingReassigned}}. The explicit {{/admin/reassign_partitions}} znode creation is to circumvent KAFKA-5161 (95b48b157aca44beec4335e62a59f37097fe7499). Doing so is valid since: 1. our code in production doesn't have that change 2. KAFKA-5161 doesn't address the underlying race condition between a broker failure and the ReassignPartitionsCommand tool creating the znode. was: This ticket is all about ControllerContext initialization and teardown. The key points are: 1. we should teardown ControllerContext during resignation instead of waiting on election to fix it up. A heapdump shows that the former controller keeps pretty much all of its ControllerContext state laying around. 2. we don't properly teardown/reset `ControllerContext.partitionsBeingReassigned`. This caused problems for us in a production cluster at linkedin as shown in the scenario below: {code} > rm -rf /tmp/zookeeper/ /tmp/kafka-logs* logs* > ./gradlew clean jar > ./bin/zookeeper-server-start.sh config/zookeeper.properties > export LOG_DIR=logs0 && ./bin/kafka-server-start.sh config/server0.properties > export LOG_DIR=logs1 && ./bin/kafka-server-start.sh config/server1.properties > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t > --replica-assignment 1 > ./bin/zookeeper-shell.sh localhost:2181 get /brokers/topics/t {"version":1,"partitions":{"0":[1]}} create /admin/reassign_partitions {"partitions":[{"topic":"t","partition":0,"replicas":[1,2]}],"version":1} Created /admin/reassign_partitions get /brokers/topics/t {"version":1,"partitions":{"0":[1,2]}} get /admin/reassign_partitions {"version":1,"partitions":[{"topic":"t","partition":0,"replicas":[1,2]}]} delete /admin/reassign_partitions delete /controller get /brokers/topics/t {"version":1,"partitions":{"0":[1,2]}} get /admin/reassign_partitions Node does not exist: /admin/reassign_partitions > echo > '{"partitions":[{"topic":"t","partition":0,"replicas":[1]}],"version":1}' > > reassignment.txt > ./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 > --reassignment-json-file reassignment.txt --execute get /brokers/topics/t
[jira] [Created] (KAFKA-5310) reset ControllerContext during resignation
Onur Karaman created KAFKA-5310: --- Summary: reset ControllerContext during resignation Key: KAFKA-5310 URL: https://issues.apache.org/jira/browse/KAFKA-5310 Project: Kafka Issue Type: Sub-task Reporter: Onur Karaman Assignee: Onur Karaman This ticket is all about ControllerContext initialization and teardown. The key points are: 1. we should teardown ControllerContext during resignation instead of waiting on election to fix it up. A heapdump shows that the former controller keeps pretty much all of its ControllerContext state laying around. 2. we don't properly teardown/reset `ControllerContext.partitionsBeingReassigned`. This caused problems for us in a production cluster at linkedin as shown in the scenario below: {code} > rm -rf /tmp/zookeeper/ /tmp/kafka-logs* logs* > ./gradlew clean jar > ./bin/zookeeper-server-start.sh config/zookeeper.properties > export LOG_DIR=logs0 && ./bin/kafka-server-start.sh config/server0.properties > export LOG_DIR=logs1 && ./bin/kafka-server-start.sh config/server1.properties > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t > --replica-assignment 1 > ./bin/zookeeper-shell.sh localhost:2181 get /brokers/topics/t {"version":1,"partitions":{"0":[1]}} create /admin/reassign_partitions {"partitions":[{"topic":"t","partition":0,"replicas":[1,2]}],"version":1} Created /admin/reassign_partitions get /brokers/topics/t {"version":1,"partitions":{"0":[1,2]}} get /admin/reassign_partitions {"version":1,"partitions":[{"topic":"t","partition":0,"replicas":[1,2]}]} delete /admin/reassign_partitions delete /controller get /brokers/topics/t {"version":1,"partitions":{"0":[1,2]}} get /admin/reassign_partitions Node does not exist: /admin/reassign_partitions > echo > '{"partitions":[{"topic":"t","partition":0,"replicas":[1]}],"version":1}' > > reassignment.txt > ./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 > --reassignment-json-file reassignment.txt --execute get /brokers/topics/t {"version":1,"partitions":{"0":[1]}} get /admin/reassign_partitions Node does not exist: /admin/reassign_partitions delete /controller get /brokers/topics/t {"version":1,"partitions":{"0":[1,2]}} get /admin/reassign_partitions Node does not exist: /admin/reassign_partitions {code} Notice that the replica set goes from \[1\] to \[1,2\] (as expected with the explicit `/admin/reassign_partitions` znode creation during the initial controller) back to \[1\] (as expected with the partition reassignment during the second controller) and again back to \[1,2\] after the original controller gets re-elected. That last transition from \[1\] to \[1,2\] is unexpected. It's due to the original controller not resetting its `ControllerContext.partitionsBeingReassigned` correctly. `initializePartitionReassignment` simply adds to what's already in `ControllerContext.partitionsBeingReassigned`. The explicit `/admin/reassign_partitions` znode creation is to circumvent KAFKA-5161 (95b48b157aca44beec4335e62a59f37097fe7499). Doing so is valid since: 1. our code in production doesn't have that change 2. KAFKA-5161 doesn't address the underlying race condition between a broker failure and the ReassignPartitionsCommand tool creating the znode. It looks like this bug has been around for quite some time (definitely before 0.10.2). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4896) Offset loading can use more threads
[ https://issues.apache.org/jira/browse/KAFKA-4896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16020447#comment-16020447 ] Onur Karaman commented on KAFKA-4896: - handleGroupImmigration only gets called from handling a LeaderAndIsrRequest. Its cache load is synchronized over the ReplicaManager's replicaStateChangeLock. handleGroupEmigration gets called in two places: when handling a LeaderAndIsrRequest as well as from StopReplicaRequest. Its cache removal is synchronized over the ReplicaManager's replicaStateChangeLock only in the LeaderAndIsrRequest handling path. I think even with today's single scheduler thread, we might have an interleaved load and removal problem when the following are happening concurrently: 1. handleGroupImmigration is called for a partition P through LeaderAndIsrRequest 2. handleGroupEmigration is called for a partition P through StopReplicaRequest This should be rare. Controller does a blocking send and receive, so I think this should only happen if: 1. one of the requests times out while the controller stays the same 2. the controller moves [~junrao] does this edge case sound right? > Offset loading can use more threads > --- > > Key: KAFKA-4896 > URL: https://issues.apache.org/jira/browse/KAFKA-4896 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 0.10.2.0 >Reporter: Jun Rao >Assignee: Abhishek Mendhekar > Labels: newbie > > Currently, in GroupMetadataManager, we have a single thread for loading the > offset cache. We could speed it up with more threads. > /* single-thread scheduler to handle offset/group metadata cache loading and > unloading */ > private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = > "group-metadata-manager-") -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (KAFKA-5258) move all partition and replica state transition rules into their states
[ https://issues.apache.org/jira/browse/KAFKA-5258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman updated KAFKA-5258: Description: Today the PartitionStateMachine and ReplicaStateMachine defines and asserts the valid state transitions inline for each state, looking something like: {code} private def handleStateChange(...) { targetState match { case stateA => { assertValidPreviousStates(topicAndPartition, List(stateX, stateY, stateZ), stateA) // actual work } case stateB => { assertValidPreviousStates(topicAndPartition, List(stateD, stateE), stateB) // actual work } } } {code} It would be cleaner to move all partition and replica state transition rules into their states and simply do the assertion at the top of the handleStateChange method like so: {code} private def handleStateChange(...) { assertValidTransition(targetState) targetState match { case stateA => { // actual work } case stateB => { // actual work } } } sealed trait State { def state: Byte def validPreviousStates: Set[State] } case object StateA extends State { val state: Byte = 1 val validPreviousStates: Set[State] = Set(StateX) } case object StateB extends State { val state: Byte = 2 val validPreviousStates: Set[State] = Set(StateX, StateY, StateZ) } {code} was: Today the PartitionStateMachine and ReplicaStateMachine defines and asserts the valid state transitions inline for each state, looking something like: {code} private def handleStateChange(...) { targetState match { case stateA => { assertValidPreviousStates(topicAndPartition, List(stateX, stateY, stateZ), stateA) // actual work } case stateB => { assertValidPreviousStates(topicAndPartition, List(stateD, stateE), stateB) // actual work } } } {code} It would be cleaner to move all partition and replica state transition rules into their and simply do the assertion at the top of the handleStateChange method like so: {code} private def handleStateChange(...) { assertValidTransition(targetState) targetState match { case stateA => { // actual work } case stateB => { // actual work } } } sealed trait State { def state: Byte def validPreviousStates: Set[State] } case object StateA extends State { val state: Byte = 1 val validPreviousStates: Set[State] = Set(StateX) } case object StateB extends State { val state: Byte = 2 val validPreviousStates: Set[State] = Set(StateX, StateY, StateZ) } {code} > move all partition and replica state transition rules into their states > --- > > Key: KAFKA-5258 > URL: https://issues.apache.org/jira/browse/KAFKA-5258 > Project: Kafka > Issue Type: Sub-task >Reporter: Onur Karaman >Assignee: Onur Karaman >Priority: Minor > > Today the PartitionStateMachine and ReplicaStateMachine defines and asserts > the valid state transitions inline for each state, looking something like: > {code} > private def handleStateChange(...) { > targetState match { > case stateA => { > assertValidPreviousStates(topicAndPartition, List(stateX, stateY, > stateZ), stateA) > // actual work > } > case stateB => { > assertValidPreviousStates(topicAndPartition, List(stateD, stateE), > stateB) > // actual work > } > } > } > {code} > It would be cleaner to move all partition and replica state transition rules > into their states and simply do the assertion at the top of the > handleStateChange method like so: > {code} > private def handleStateChange(...) { > assertValidTransition(targetState) > targetState match { > case stateA => { > // actual work > } > case stateB => { > // actual work > } > } > } > sealed trait State { > def state: Byte > def validPreviousStates: Set[State] > } > case object StateA extends State { > val state: Byte = 1 > val validPreviousStates: Set[State] = Set(StateX) > } > case object StateB extends State { > val state: Byte = 2 > val validPreviousStates: Set[State] = Set(StateX, StateY, StateZ) > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (KAFKA-5258) move all partition and replica state transition rules into their states
[ https://issues.apache.org/jira/browse/KAFKA-5258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman updated KAFKA-5258: Description: Today the PartitionStateMachine and ReplicaStateMachine defines and asserts the valid state transitions inline for each state, looking something like: {code} private def handleStateChange(...) { targetState match { case stateA => { assertValidPreviousStates(topicAndPartition, List(stateX, stateY, stateZ), stateA) // actual work } case stateB => { assertValidPreviousStates(topicAndPartition, List(stateD, stateE), stateB) // actual work } } } {code} It would be cleaner to move all partition and replica state transition rules into their and simply do the assertion at the top of the handleStateChange method like so: {code} private def handleStateChange(...) { assertValidTransition(targetState) targetState match { case stateA => { // actual work } case stateB => { // actual work } } } sealed trait State { def state: Byte def validPreviousStates: Set[State] } case object StateA extends State { val state: Byte = 1 val validPreviousStates: Set[State] = Set(StateX) } case object StateB extends State { val state: Byte = 2 val validPreviousStates: Set[State] = Set(StateX, StateY, StateZ) } {code} was: Today the PartitionStateMachine and ReplicaStateMachine defines and asserts the valid state transitions inline for each state, looking something like: {code} private def handleStateChange(...) { targetState match { case stateA => { assertValidPreviousStates(topicAndPartition, List(stateX, stateY, stateZ), stateA) // actual work } case stateB => { assertValidPreviousStates(topicAndPartition, List(stateD, stateE), stateB) // actual work } } } {code} It would be cleaner to move all partition and replica state transition rules into a map and simply do the assertion at the top of the handleStateChange method like so: {code} private val validPreviousStates: Map[State, Set[State]] = ... private def handleStateChange(...) { assertValidTransition(targetState) targetState match { case stateA => { // actual work } case stateB => { // actual work } } } {code} > move all partition and replica state transition rules into their states > --- > > Key: KAFKA-5258 > URL: https://issues.apache.org/jira/browse/KAFKA-5258 > Project: Kafka > Issue Type: Sub-task >Reporter: Onur Karaman >Assignee: Onur Karaman >Priority: Minor > > Today the PartitionStateMachine and ReplicaStateMachine defines and asserts > the valid state transitions inline for each state, looking something like: > {code} > private def handleStateChange(...) { > targetState match { > case stateA => { > assertValidPreviousStates(topicAndPartition, List(stateX, stateY, > stateZ), stateA) > // actual work > } > case stateB => { > assertValidPreviousStates(topicAndPartition, List(stateD, stateE), > stateB) > // actual work > } > } > } > {code} > It would be cleaner to move all partition and replica state transition rules > into their and simply do the assertion at the top of the handleStateChange > method like so: > {code} > private def handleStateChange(...) { > assertValidTransition(targetState) > targetState match { > case stateA => { > // actual work > } > case stateB => { > // actual work > } > } > } > sealed trait State { > def state: Byte > def validPreviousStates: Set[State] > } > case object StateA extends State { > val state: Byte = 1 > val validPreviousStates: Set[State] = Set(StateX) > } > case object StateB extends State { > val state: Byte = 2 > val validPreviousStates: Set[State] = Set(StateX, StateY, StateZ) > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (KAFKA-5258) move all partition and replica state transition rules into their states
[ https://issues.apache.org/jira/browse/KAFKA-5258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman updated KAFKA-5258: Summary: move all partition and replica state transition rules into their states (was: move all partition and replica state transition rules into a map) > move all partition and replica state transition rules into their states > --- > > Key: KAFKA-5258 > URL: https://issues.apache.org/jira/browse/KAFKA-5258 > Project: Kafka > Issue Type: Sub-task >Reporter: Onur Karaman >Assignee: Onur Karaman >Priority: Minor > > Today the PartitionStateMachine and ReplicaStateMachine defines and asserts > the valid state transitions inline for each state, looking something like: > {code} > private def handleStateChange(...) { > targetState match { > case stateA => { > assertValidPreviousStates(topicAndPartition, List(stateX, stateY, > stateZ), stateA) > // actual work > } > case stateB => { > assertValidPreviousStates(topicAndPartition, List(stateD, stateE), > stateB) > // actual work > } > } > } > {code} > It would be cleaner to move all partition and replica state transition rules > into a map and simply do the assertion at the top of the handleStateChange > method like so: > {code} > private val validPreviousStates: Map[State, Set[State]] = ... > private def handleStateChange(...) { > assertValidTransition(targetState) > targetState match { > case stateA => { > // actual work > } > case stateB => { > // actual work > } > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-5261) Performance improvement of SimpleAclAuthorizer
[ https://issues.apache.org/jira/browse/KAFKA-5261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16013665#comment-16013665 ] Onur Karaman commented on KAFKA-5261: - Would one alternative be to just make a CachedAclAuthorizer which decorates the SimpleAclAuthorizer with a cache? > Performance improvement of SimpleAclAuthorizer > -- > > Key: KAFKA-5261 > URL: https://issues.apache.org/jira/browse/KAFKA-5261 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.10.2.1 >Reporter: Stephane Maarek > > Currently, looking at the KafkaApis class, it seems that every request going > through Kafka is also going through an authorize check: > {code} > private def authorize(session: Session, operation: Operation, resource: > Resource): Boolean = > authorizer.forall(_.authorize(session, operation, resource)) > {code} > The SimpleAclAuthorizer logic runs through checks which all look to be done > in linear time (except on first run) proportional to the number of acls on a > specific resource. This operation is re-run every time a client tries to use > a Kafka Api, especially on the very often called `handleProducerRequest` and > `handleFetchRequest` > I believe a cache could be built to store the result of the authorize call, > possibly allowing more expensive authorize() calls to happen, and reducing > greatly the CPU usage in the long run. The cache would be invalidated every > time a change happens to aclCache > Thoughts before I try giving it a go with a PR? -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (KAFKA-5258) move all partition and replica state transition rules into a map
[ https://issues.apache.org/jira/browse/KAFKA-5258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman updated KAFKA-5258: Status: Patch Available (was: In Progress) > move all partition and replica state transition rules into a map > > > Key: KAFKA-5258 > URL: https://issues.apache.org/jira/browse/KAFKA-5258 > Project: Kafka > Issue Type: Sub-task >Reporter: Onur Karaman >Assignee: Onur Karaman >Priority: Minor > > Today the PartitionStateMachine and ReplicaStateMachine defines and asserts > the valid state transitions inline for each state, looking something like: > {code} > private def handleStateChange(...) { > targetState match { > case stateA => { > assertValidPreviousStates(topicAndPartition, List(stateX, stateY, > stateZ), stateA) > // actual work > } > case stateB => { > assertValidPreviousStates(topicAndPartition, List(stateD, stateE), > stateB) > // actual work > } > } > } > {code} > It would be cleaner to move all partition and replica state transition rules > into a map and simply do the assertion at the top of the handleStateChange > method like so: > {code} > private val validPreviousStates: Map[State, Set[State]] = ... > private def handleStateChange(...) { > assertValidTransition(targetState) > targetState match { > case stateA => { > // actual work > } > case stateB => { > // actual work > } > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Work started] (KAFKA-5258) move all partition and replica state transition rules into a map
[ https://issues.apache.org/jira/browse/KAFKA-5258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-5258 started by Onur Karaman. --- > move all partition and replica state transition rules into a map > > > Key: KAFKA-5258 > URL: https://issues.apache.org/jira/browse/KAFKA-5258 > Project: Kafka > Issue Type: Sub-task >Reporter: Onur Karaman >Assignee: Onur Karaman >Priority: Minor > > Today the PartitionStateMachine and ReplicaStateMachine defines and asserts > the valid state transitions inline for each state, looking something like: > {code} > private def handleStateChange(...) { > targetState match { > case stateA => { > assertValidPreviousStates(topicAndPartition, List(stateX, stateY, > stateZ), stateA) > // actual work > } > case stateB => { > assertValidPreviousStates(topicAndPartition, List(stateD, stateE), > stateB) > // actual work > } > } > } > {code} > It would be cleaner to move all partition and replica state transition rules > into a map and simply do the assertion at the top of the handleStateChange > method like so: > {code} > private val validPreviousStates: Map[State, Set[State]] = ... > private def handleStateChange(...) { > assertValidTransition(targetState) > targetState match { > case stateA => { > // actual work > } > case stateB => { > // actual work > } > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-5258) move all partition and replica state transition rules into a map
Onur Karaman created KAFKA-5258: --- Summary: move all partition and replica state transition rules into a map Key: KAFKA-5258 URL: https://issues.apache.org/jira/browse/KAFKA-5258 Project: Kafka Issue Type: Sub-task Reporter: Onur Karaman Assignee: Onur Karaman Priority: Minor Today the PartitionStateMachine and ReplicaStateMachine defines and asserts the valid state transitions inline for each state, looking something like: {code} private def handleStateChange(...) { targetState match { case stateA => { assertValidPreviousStates(topicAndPartition, List(stateX, stateY, stateZ), stateA) // actual work } case stateB => { assertValidPreviousStates(topicAndPartition, List(stateD, stateE), stateB) // actual work } } } {code} It would be cleaner to move all partition and replica state transition rules into a map and simply do the assertion at the top of the handleStateChange method like so: {code} private val validPreviousStates: Map[State, Set[State]] = ... private def handleStateChange(...) { assertValidTransition(targetState) targetState match { case stateA => { // actual work } case stateB => { // actual work } } } {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (KAFKA-5175) Transient failure: ControllerIntegrationTest.testPreferredReplicaLeaderElection
[ https://issues.apache.org/jira/browse/KAFKA-5175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16011851#comment-16011851 ] Onur Karaman edited comment on KAFKA-5175 at 5/16/17 7:01 AM: -- Cool. I can steadily reproduce this by inserting a long sleep in Partition.maybeExpandIsr: {code} diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 1d13689..b811d31 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -281,6 +281,7 @@ class Partition(val topic: String, def maybeExpandIsr(replicaId: Int, logReadResult: LogReadResult): Boolean = { inWriteLock(leaderIsrUpdateLock) { // check if this replica needs to be added to the ISR + Thread.sleep(10) leaderReplicaIfLocal match { case Some(leaderReplica) => val replica = getReplica(replicaId).get {code} This makes me think that the controller is processing the preferred replica leader election before the restarted broker (the preferred replica leader) has joined isr, causing preferred replica leader election to fail and for the final zookeeper state validation to fail. {code} kafka.controller.ControllerIntegrationTest > testPreferredReplicaLeaderElection FAILED java.lang.AssertionError: failed to get expected partition state upon broker startup at kafka.utils.TestUtils$.fail(TestUtils.scala:323) at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:823) at kafka.controller.ControllerIntegrationTest.waitForPartitionState(ControllerIntegrationTest.scala:291) at kafka.controller.ControllerIntegrationTest.testPreferredReplicaLeaderElection(ControllerIntegrationTest.scala:204) {code} was (Author: onurkaraman): Cool. I can steadily reproduce this by inserting a long sleep in Partition.maybeExpandIsr: {code} diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 1d13689..b811d31 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -281,6 +281,7 @@ class Partition(val topic: String, def maybeExpandIsr(replicaId: Int, logReadResult: LogReadResult): Boolean = { inWriteLock(leaderIsrUpdateLock) { // check if this replica needs to be added to the ISR + Thread.sleep(10) leaderReplicaIfLocal match { case Some(leaderReplica) => val replica = getReplica(replicaId).get {code} This makes me think that the controller is processing the preferred replica leader election before the restarted broker (the preferred replica leader) has joined isr, causing preferred replica leader election to fail and for the final zookeeper state validation to fail. > Transient failure: > ControllerIntegrationTest.testPreferredReplicaLeaderElection > --- > > Key: KAFKA-5175 > URL: https://issues.apache.org/jira/browse/KAFKA-5175 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma >Assignee: Onur Karaman > > {code} > java.lang.AssertionError: failed to get expected partition state upon broker > startup > at kafka.utils.TestUtils$.fail(TestUtils.scala:311) > at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:811) > at > kafka.controller.ControllerIntegrationTest.waitForPartitionState(ControllerIntegrationTest.scala:293) > at > kafka.controller.ControllerIntegrationTest.testPreferredReplicaLeaderElection(ControllerIntegrationTest.scala:211) > {code} > https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3497/testReport/kafka.controller/ControllerIntegrationTest/testPreferredReplicaLeaderElection/ -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-5175) Transient failure: ControllerIntegrationTest.testPreferredReplicaLeaderElection
[ https://issues.apache.org/jira/browse/KAFKA-5175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16011851#comment-16011851 ] Onur Karaman commented on KAFKA-5175: - Cool. I can steadily reproduce this by inserting a long sleep in Partition.maybeExpandIsr: {code} diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 1d13689..b811d31 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -281,6 +281,7 @@ class Partition(val topic: String, def maybeExpandIsr(replicaId: Int, logReadResult: LogReadResult): Boolean = { inWriteLock(leaderIsrUpdateLock) { // check if this replica needs to be added to the ISR + Thread.sleep(10) leaderReplicaIfLocal match { case Some(leaderReplica) => val replica = getReplica(replicaId).get {code} This makes me think that the controller is processing the preferred replica leader election before the restarted broker (the preferred replica leader) has joined isr, causing preferred replica leader election to fail and for the final zookeeper state validation to fail. > Transient failure: > ControllerIntegrationTest.testPreferredReplicaLeaderElection > --- > > Key: KAFKA-5175 > URL: https://issues.apache.org/jira/browse/KAFKA-5175 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma >Assignee: Onur Karaman > > {code} > java.lang.AssertionError: failed to get expected partition state upon broker > startup > at kafka.utils.TestUtils$.fail(TestUtils.scala:311) > at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:811) > at > kafka.controller.ControllerIntegrationTest.waitForPartitionState(ControllerIntegrationTest.scala:293) > at > kafka.controller.ControllerIntegrationTest.testPreferredReplicaLeaderElection(ControllerIntegrationTest.scala:211) > {code} > https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3497/testReport/kafka.controller/ControllerIntegrationTest/testPreferredReplicaLeaderElection/ -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-5175) Transient failure: ControllerIntegrationTest.testPreferredReplicaLeaderElection
[ https://issues.apache.org/jira/browse/KAFKA-5175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16011500#comment-16011500 ] Onur Karaman commented on KAFKA-5175: - Thanks [~ijuma]. I've been staring at this and couldn't figure it out yet. > Transient failure: > ControllerIntegrationTest.testPreferredReplicaLeaderElection > --- > > Key: KAFKA-5175 > URL: https://issues.apache.org/jira/browse/KAFKA-5175 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma >Assignee: Onur Karaman > > {code} > java.lang.AssertionError: failed to get expected partition state upon broker > startup > at kafka.utils.TestUtils$.fail(TestUtils.scala:311) > at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:811) > at > kafka.controller.ControllerIntegrationTest.waitForPartitionState(ControllerIntegrationTest.scala:293) > at > kafka.controller.ControllerIntegrationTest.testPreferredReplicaLeaderElection(ControllerIntegrationTest.scala:211) > {code} > https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3497/testReport/kafka.controller/ControllerIntegrationTest/testPreferredReplicaLeaderElection/ -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-3356) Remove ConsumerOffsetChecker, deprecated in 0.9, in 0.11
[ https://issues.apache.org/jira/browse/KAFKA-3356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16010823#comment-16010823 ] Onur Karaman commented on KAFKA-3356: - I agree with [~jeffwidman]. > Remove ConsumerOffsetChecker, deprecated in 0.9, in 0.11 > > > Key: KAFKA-3356 > URL: https://issues.apache.org/jira/browse/KAFKA-3356 > Project: Kafka > Issue Type: Bug > Components: tools >Affects Versions: 0.10.2.0 >Reporter: Ashish Singh >Assignee: Mickael Maison >Priority: Blocker > Fix For: 0.11.0.0 > > > ConsumerOffsetChecker is marked deprecated as of 0.9, should be removed in > 0.11. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (KAFKA-5180) Transient failure: ControllerIntegrationTest.testControllerMoveIncrementsControllerEpoch
[ https://issues.apache.org/jira/browse/KAFKA-5180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman updated KAFKA-5180: Status: Patch Available (was: In Progress) > Transient failure: > ControllerIntegrationTest.testControllerMoveIncrementsControllerEpoch > > > Key: KAFKA-5180 > URL: https://issues.apache.org/jira/browse/KAFKA-5180 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma >Assignee: Onur Karaman > > {code} > Stacktrace org.I0Itec.zkclient.exception.ZkNoNodeException: > org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = > NoNode for /controller_epoch at > org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47) > at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:1001) > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1100)at > org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1095)at > kafka.utils.ZkUtils.readData(ZkUtils.scala:652) at > kafka.controller.ControllerIntegrationTest.$anonfun$testControllerMoveIncrementsControllerEpoch$2(ControllerIntegrationTest.scala:68) > at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:808)at > kafka.controller.ControllerIntegrationTest.testControllerMoveIncrementsControllerEpoch(ControllerIntegrationTest.scala:68) > {code} > cc [~onurkaraman] > https://builds.apache.org/blue/organizations/jenkins/kafka-trunk-jdk8/detail/kafka-trunk-jdk8/1487/tests -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Work started] (KAFKA-5180) Transient failure: ControllerIntegrationTest.testControllerMoveIncrementsControllerEpoch
[ https://issues.apache.org/jira/browse/KAFKA-5180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-5180 started by Onur Karaman. --- > Transient failure: > ControllerIntegrationTest.testControllerMoveIncrementsControllerEpoch > > > Key: KAFKA-5180 > URL: https://issues.apache.org/jira/browse/KAFKA-5180 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma >Assignee: Onur Karaman > > {code} > Stacktrace org.I0Itec.zkclient.exception.ZkNoNodeException: > org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = > NoNode for /controller_epoch at > org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47) > at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:1001) > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1100)at > org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1095)at > kafka.utils.ZkUtils.readData(ZkUtils.scala:652) at > kafka.controller.ControllerIntegrationTest.$anonfun$testControllerMoveIncrementsControllerEpoch$2(ControllerIntegrationTest.scala:68) > at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:808)at > kafka.controller.ControllerIntegrationTest.testControllerMoveIncrementsControllerEpoch(ControllerIntegrationTest.scala:68) > {code} > cc [~onurkaraman] > https://builds.apache.org/blue/organizations/jenkins/kafka-trunk-jdk8/detail/kafka-trunk-jdk8/1487/tests -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (KAFKA-5120) Several controller metrics block if controller lock is held by another thread
[ https://issues.apache.org/jira/browse/KAFKA-5120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman resolved KAFKA-5120. - Resolution: Fixed KAFKA-5028 has been checked in so this should no longer be an issue. > Several controller metrics block if controller lock is held by another thread > - > > Key: KAFKA-5120 > URL: https://issues.apache.org/jira/browse/KAFKA-5120 > Project: Kafka > Issue Type: Bug > Components: controller, metrics >Affects Versions: 0.10.2.0 >Reporter: Tim Carey-Smith >Priority: Minor > > We have been tracking latency issues surrounding queries to Controller > MBeans. Upon digging into the root causes, we discovered that several metrics > acquire the controller lock within the gauge. > The affected metrics are: > * {{ActiveControllerCount}} > * {{OfflinePartitionsCount}} > * {{PreferredReplicaImbalanceCount}} > If the controller is currently holding the lock and a MBean request is > received, the thread executing the request will block until the controller > releases the lock. > We discovered this in a cluster where the controller was holding the lock for > extended periods of time for normal operations. We have documented this issue > in KAFKA-5116. > Several possible solutions exist: > * Remove the lock from inside these {{Gauge}} s. > * Store and update the metric values in {{AtomicLong}} s. > Modifying the {{ActiveControllerCount}} metric seems to be straight-forward > while the other 2 metrics seem to be more involved. > We're happy to contribute a patch, but wanted to discuss potential solutions > and their tradeoffs before proceeding. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (KAFKA-3107) Error when trying to shut down auto balancing scheduler of controller
[ https://issues.apache.org/jira/browse/KAFKA-3107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman resolved KAFKA-3107. - Resolution: Fixed This problem should no longer exist after KAFKA-5028. > Error when trying to shut down auto balancing scheduler of controller > - > > Key: KAFKA-3107 > URL: https://issues.apache.org/jira/browse/KAFKA-3107 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.2.1 >Reporter: Flavio Junqueira > > We observed the following exception when a controller was shutting down: > {noformat} > [run] Error handling event ZkEvent[New session event sent to > kafka.controller.KafkaController$SessionExpirationListener@3278c211] > java.lang.IllegalStateException: Kafka scheduler has not been started > at kafka.utils.KafkaScheduler.ensureStarted(KafkaScheduler.scala:114) > at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:86) > at > kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:350) > at > kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply$mcZ$sp(KafkaController.scala:1108) > at > kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1107) > at > kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1107) > at kafka.utils.Utils$.inLock(Utils.scala:535) > at > kafka.controller.KafkaController$SessionExpirationListener.handleNewSession(KafkaController.scala:1107) > at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) > {noformat} > The scheduler should have been started. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (KAFKA-5197) add a tool analyzing zookeeper client performance across its various apis
[ https://issues.apache.org/jira/browse/KAFKA-5197?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman updated KAFKA-5197: Status: Patch Available (was: In Progress) > add a tool analyzing zookeeper client performance across its various apis > - > > Key: KAFKA-5197 > URL: https://issues.apache.org/jira/browse/KAFKA-5197 > Project: Kafka > Issue Type: Sub-task >Reporter: Onur Karaman >Assignee: Onur Karaman > > The raw zookeeper client offers various means of getting and setting znodes. > It would be useful to have a tool that lets you analyze the performance of > these apis. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Work started] (KAFKA-5197) add a tool analyzing zookeeper client performance across its various apis
[ https://issues.apache.org/jira/browse/KAFKA-5197?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-5197 started by Onur Karaman. --- > add a tool analyzing zookeeper client performance across its various apis > - > > Key: KAFKA-5197 > URL: https://issues.apache.org/jira/browse/KAFKA-5197 > Project: Kafka > Issue Type: Sub-task >Reporter: Onur Karaman >Assignee: Onur Karaman > > The raw zookeeper client offers various means of getting and setting znodes. > It would be useful to have a tool that lets you analyze the performance of > these apis. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-5197) add a tool analyzing zookeeper client performance across its various apis
Onur Karaman created KAFKA-5197: --- Summary: add a tool analyzing zookeeper client performance across its various apis Key: KAFKA-5197 URL: https://issues.apache.org/jira/browse/KAFKA-5197 Project: Kafka Issue Type: Sub-task Reporter: Onur Karaman Assignee: Onur Karaman The raw zookeeper client offers various means of getting and setting znodes. It would be useful to have a tool that lets you analyze the performance of these apis. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-5099) Replica Deletion Regression from KIP-101
[ https://issues.apache.org/jira/browse/KAFKA-5099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16000261#comment-16000261 ] Onur Karaman commented on KAFKA-5099: - By the way, I reran the file descriptor experiment against both my patch as well as against git hash b611cfa5c0f4941a491781424afd9b699bdb894e (the commit before KIP-101: 0baea2ac13532981f3fea11e5dfc6da5aafaeaa8) on both mac and linux. The file descriptors stick around while the index files have been removed for over 10 minutes so I gave up waiting. > Replica Deletion Regression from KIP-101 > > > Key: KAFKA-5099 > URL: https://issues.apache.org/jira/browse/KAFKA-5099 > Project: Kafka > Issue Type: Bug >Reporter: Onur Karaman >Assignee: Onur Karaman >Priority: Blocker > Fix For: 0.11.0.0 > > > It appears that replica deletion regressed from KIP-101. Replica deletion > happens when a broker receives a StopReplicaRequest with delete=true. Ever > since KAFKA-1911, replica deletion has been async, meaning the broker > responds with a StopReplicaResponse simply after marking the replica > directory as staged for deletion. This marking happens by moving a data log > directory and its contents such as /tmp/kafka-logs1/t1-0 to a marked > directory like /tmp/kafka-logs1/t1-0.8c9c4c0c61c44cc59ebeb00075a2a07f-delete, > acting as a soft-delete. A scheduled thread later actually deletes the data. > It appears that the regression occurs while the scheduled thread is actually > trying to delete the data, which means the controller considers operations > such as partition reassignment and topic deletion complete. But if you look > at the log4j logs and data logs, you'll find that the soft-deleted data logs > haven't actually won't get deleted. It seems that restarting the broker > actually allows for the soft-deleted directories to get deleted. > Here's the setup: > {code} > > ./bin/zookeeper-server-start.sh config/zookeeper.properties > > export LOG_DIR=logs0 && ./bin/kafka-server-start.sh > > config/server0.properties > > export LOG_DIR=logs1 && ./bin/kafka-server-start.sh > > config/server1.properties > > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t0 > > --replica-assignment 1:0 > > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t1 > > --replica-assignment 1:0 > > ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic t0 > > cat p.txt > {"partitions": > [ > {"topic": "t1", "partition": 0, "replicas": [0] } > ], > "version":1 > } > > ./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 > > --reassignment-json-file p.txt --execute > {code} > Here are sample logs: > {code} > [2017-04-20 17:46:54,801] INFO [ReplicaFetcherManager on broker 1] Removed > fetcher for partitions t0-0 (kafka.server.ReplicaFetcherManager) > [2017-04-20 17:46:54,814] INFO Log for partition t0-0 is renamed to > /tmp/kafka-logs1/t0-0.bbc8fa126e3e4ff787f6b68d158ab771-delete and is > scheduled for deletion (kafka.log.LogManager) > [2017-04-20 17:47:27,585] INFO Deleting index > /tmp/kafka-logs1/t0-0.bbc8fa126e3e4ff787f6b68d158ab771-delete/.index > (kafka.log.OffsetIndex) > [2017-04-20 17:47:27,586] INFO Deleting index > /tmp/kafka-logs1/t0-0/.timeindex (kafka.log.TimeIndex) > [2017-04-20 17:47:27,587] ERROR Exception in deleting > Log(/tmp/kafka-logs1/t0-0.bbc8fa126e3e4ff787f6b68d158ab771-delete). Moving it > to the end of the queue. (kafka.log.LogManager) > java.io.FileNotFoundException: > /tmp/kafka-logs1/t0-0/leader-epoch-checkpoint.tmp (No such file or directory) > at java.io.FileOutputStream.open0(Native Method) > at java.io.FileOutputStream.open(FileOutputStream.java:270) > at java.io.FileOutputStream.(FileOutputStream.java:213) > at java.io.FileOutputStream.(FileOutputStream.java:162) > at kafka.server.checkpoints.CheckpointFile.write(CheckpointFile.scala:41) > at > kafka.server.checkpoints.LeaderEpochCheckpointFile.write(LeaderEpochCheckpointFile.scala:61) > at > kafka.server.epoch.LeaderEpochFileCache.kafka$server$epoch$LeaderEpochFileCache$$flush(LeaderEpochFileCache.scala:178) > at > kafka.server.epoch.LeaderEpochFileCache$$anonfun$clear$1.apply$mcV$sp(LeaderEpochFileCache.scala:161) > at > kafka.server.epoch.LeaderEpochFileCache$$anonfun$clear$1.apply(LeaderEpochFileCache.scala:159) > at > kafka.server.epoch.LeaderEpochFileCache$$anonfun$clear$1.apply(LeaderEpochFileCache.scala:159) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213) > at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:221) > at > kafka.server.epoch.LeaderEpochFileCache.clear(LeaderEpochFileCache.scala:159) > at kafka.log.Log.delete(Log.scala:1051) > at >
[jira] [Commented] (KAFKA-5099) Replica Deletion Regression from KIP-101
[ https://issues.apache.org/jira/browse/KAFKA-5099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15999200#comment-15999200 ] Onur Karaman commented on KAFKA-5099: - Hi [~junrao]. Thanks for figuring it out. I gave your solution a shot and it does seem to delete the data now. However, I noticed something which I'm not sure is normal. While the files get deleted successfully now, it looks like the process holds onto the file descriptors corresponding to the index and timeindex files in their soft-delete directories for a few minutes. Is this somehow related to them being mmap'd? So there's a window of time where I see: {code} > l /tmp/kafka-logs*/t* ls: /tmp/kafka-logs*/t*: No such file or directory > lsof -a -p 42214 | grep "/tmp/kafka-logs" java42214 okaraman txt REG1,4 0 86940741 /private/tmp/kafka-logs0/t0-0.f87568e7c6db40c7a898719376a79f8b-delete/.index java42214 okaraman txt REG1,4 0 86940744 /private/tmp/kafka-logs0/t0-0.f87568e7c6db40c7a898719376a79f8b-delete/.timeindex java42214 okaraman 123u REG1,4 0 86940706 /private/tmp/kafka-logs0/.lock {code} The file descriptor goes away after a few minutes: {code} > l /tmp/kafka-logs*/t* ls: /tmp/kafka-logs*/t*: No such file or directory > lsof -a -p 42214 | grep "/tmp/kafka-logs" java42214 okaraman 123u REG1,4 0 86940706 /private/tmp/kafka-logs0/.lock {code} > Replica Deletion Regression from KIP-101 > > > Key: KAFKA-5099 > URL: https://issues.apache.org/jira/browse/KAFKA-5099 > Project: Kafka > Issue Type: Bug >Reporter: Onur Karaman >Assignee: Onur Karaman >Priority: Blocker > Fix For: 0.11.0.0 > > > It appears that replica deletion regressed from KIP-101. Replica deletion > happens when a broker receives a StopReplicaRequest with delete=true. Ever > since KAFKA-1911, replica deletion has been async, meaning the broker > responds with a StopReplicaResponse simply after marking the replica > directory as staged for deletion. This marking happens by moving a data log > directory and its contents such as /tmp/kafka-logs1/t1-0 to a marked > directory like /tmp/kafka-logs1/t1-0.8c9c4c0c61c44cc59ebeb00075a2a07f-delete, > acting as a soft-delete. A scheduled thread later actually deletes the data. > It appears that the regression occurs while the scheduled thread is actually > trying to delete the data, which means the controller considers operations > such as partition reassignment and topic deletion complete. But if you look > at the log4j logs and data logs, you'll find that the soft-deleted data logs > haven't actually won't get deleted. It seems that restarting the broker > actually allows for the soft-deleted directories to get deleted. > Here's the setup: > {code} > > ./bin/zookeeper-server-start.sh config/zookeeper.properties > > export LOG_DIR=logs0 && ./bin/kafka-server-start.sh > > config/server0.properties > > export LOG_DIR=logs1 && ./bin/kafka-server-start.sh > > config/server1.properties > > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t0 > > --replica-assignment 1:0 > > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t1 > > --replica-assignment 1:0 > > ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic t0 > > cat p.txt > {"partitions": > [ > {"topic": "t1", "partition": 0, "replicas": [0] } > ], > "version":1 > } > > ./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 > > --reassignment-json-file p.txt --execute > {code} > Here are sample logs: > {code} > [2017-04-20 17:46:54,801] INFO [ReplicaFetcherManager on broker 1] Removed > fetcher for partitions t0-0 (kafka.server.ReplicaFetcherManager) > [2017-04-20 17:46:54,814] INFO Log for partition t0-0 is renamed to > /tmp/kafka-logs1/t0-0.bbc8fa126e3e4ff787f6b68d158ab771-delete and is > scheduled for deletion (kafka.log.LogManager) > [2017-04-20 17:47:27,585] INFO Deleting index > /tmp/kafka-logs1/t0-0.bbc8fa126e3e4ff787f6b68d158ab771-delete/.index > (kafka.log.OffsetIndex) > [2017-04-20 17:47:27,586] INFO Deleting index > /tmp/kafka-logs1/t0-0/.timeindex (kafka.log.TimeIndex) > [2017-04-20 17:47:27,587] ERROR Exception in deleting > Log(/tmp/kafka-logs1/t0-0.bbc8fa126e3e4ff787f6b68d158ab771-delete). Moving it > to the end of the queue. (kafka.log.LogManager) > java.io.FileNotFoundException: > /tmp/kafka-logs1/t0-0/leader-epoch-checkpoint.tmp (No such file or directory) > at java.io.FileOutputStream.open0(Native Method) > at java.io.FileOutputStream.open(FileOutputStream.java:270) > at java.io.FileOutputStream.(FileOutputStream.java:213) > at
[jira] [Assigned] (KAFKA-5180) Transient failure: ControllerIntegrationTest.testControllerMoveIncrementsControllerEpoch
[ https://issues.apache.org/jira/browse/KAFKA-5180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman reassigned KAFKA-5180: --- Assignee: Onur Karaman > Transient failure: > ControllerIntegrationTest.testControllerMoveIncrementsControllerEpoch > > > Key: KAFKA-5180 > URL: https://issues.apache.org/jira/browse/KAFKA-5180 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma >Assignee: Onur Karaman > > {code} > Stacktrace org.I0Itec.zkclient.exception.ZkNoNodeException: > org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = > NoNode for /controller_epoch at > org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47) > at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:1001) > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1100)at > org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1095)at > kafka.utils.ZkUtils.readData(ZkUtils.scala:652) at > kafka.controller.ControllerIntegrationTest.$anonfun$testControllerMoveIncrementsControllerEpoch$2(ControllerIntegrationTest.scala:68) > at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:808)at > kafka.controller.ControllerIntegrationTest.testControllerMoveIncrementsControllerEpoch(ControllerIntegrationTest.scala:68) > {code} > cc [~onurkaraman] > https://builds.apache.org/blue/organizations/jenkins/kafka-trunk-jdk8/detail/kafka-trunk-jdk8/1487/tests -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (KAFKA-5175) Transient failure: ControllerIntegrationTest.testPreferredReplicaLeaderElection
[ https://issues.apache.org/jira/browse/KAFKA-5175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman reassigned KAFKA-5175: --- Assignee: Onur Karaman > Transient failure: > ControllerIntegrationTest.testPreferredReplicaLeaderElection > --- > > Key: KAFKA-5175 > URL: https://issues.apache.org/jira/browse/KAFKA-5175 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma >Assignee: Onur Karaman > > {code} > java.lang.AssertionError: failed to get expected partition state upon broker > startup > at kafka.utils.TestUtils$.fail(TestUtils.scala:311) > at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:811) > at > kafka.controller.ControllerIntegrationTest.waitForPartitionState(ControllerIntegrationTest.scala:293) > at > kafka.controller.ControllerIntegrationTest.testPreferredReplicaLeaderElection(ControllerIntegrationTest.scala:211) > {code} > https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3497/testReport/kafka.controller/ControllerIntegrationTest/testPreferredReplicaLeaderElection/ -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (KAFKA-5107) remove preferred replica election state from ControllerContext
[ https://issues.apache.org/jira/browse/KAFKA-5107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman updated KAFKA-5107: Status: Patch Available (was: In Progress) > remove preferred replica election state from ControllerContext > -- > > Key: KAFKA-5107 > URL: https://issues.apache.org/jira/browse/KAFKA-5107 > Project: Kafka > Issue Type: Sub-task >Reporter: Onur Karaman >Assignee: Onur Karaman > > KAFKA-5028 moves the controller to a single-threaded model, so we would no > longer have work interleaved between preferred replica leader election, > meaning we don't need to keep its state. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Work started] (KAFKA-5107) remove preferred replica election state from ControllerContext
[ https://issues.apache.org/jira/browse/KAFKA-5107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-5107 started by Onur Karaman. --- > remove preferred replica election state from ControllerContext > -- > > Key: KAFKA-5107 > URL: https://issues.apache.org/jira/browse/KAFKA-5107 > Project: Kafka > Issue Type: Sub-task >Reporter: Onur Karaman >Assignee: Onur Karaman > > KAFKA-5028 moves the controller to a single-threaded model, so we would no > longer have work interleaved between preferred replica leader election, > meaning we don't need to keep its state. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-5028) convert kafka controller to a single-threaded event queue model
[ https://issues.apache.org/jira/browse/KAFKA-5028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15987101#comment-15987101 ] Onur Karaman commented on KAFKA-5028: - Shortly after check-in, I noticed a bug: the patch has onPreferredReplicaElection keep the line calling topicDeletionManager.markTopicIneligibleForDeletion but removes the line calling topicDeletionManager.resumeDeletionForTopics. I will address this in KAFKA-5107 > convert kafka controller to a single-threaded event queue model > --- > > Key: KAFKA-5028 > URL: https://issues.apache.org/jira/browse/KAFKA-5028 > Project: Kafka > Issue Type: Sub-task >Reporter: Onur Karaman >Assignee: Onur Karaman > Fix For: 0.11.0.0 > > > The goal of this ticket is to improve controller maintainability by > simplifying the controller's concurrency semantics. The controller code has a > lot of shared state between several threads using several concurrency > primitives. This makes the code hard to reason about. > This ticket proposes we convert the controller to a single-threaded event > queue model. We add a new controller thread which processes events held in an > event queue. Note that this does not mean we get rid of all threads used by > the controller. We merely delegate all work that interacts with controller > local state to this single thread. With only a single thread accessing and > modifying the controller local state, we no longer need to worry about > concurrent access, which means we can get rid of the various concurrency > primitives used throughout the controller. > Performance is expected to match existing behavior since the bulk of the > existing controller work today already happens sequentially in the ZkClient’s > single ZkEventThread. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-5120) Several controller metrics block if controller lock is held by another thread
[ https://issues.apache.org/jira/browse/KAFKA-5120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15982118#comment-15982118 ] Onur Karaman commented on KAFKA-5120: - This will be fixed in KAFKA-5028. It chooses the latter approach you mentioned where the metrics would just read an atomic variable holding the precomputed metric values. > Several controller metrics block if controller lock is held by another thread > - > > Key: KAFKA-5120 > URL: https://issues.apache.org/jira/browse/KAFKA-5120 > Project: Kafka > Issue Type: Bug > Components: controller, metrics >Affects Versions: 0.10.2.0 >Reporter: Tim Carey-Smith >Priority: Minor > > We have been tracking latency issues surrounding queries to Controller > MBeans. Upon digging into the root causes, we discovered that several metrics > acquire the controller lock within the gauge. > The affected metrics are: > * {{ActiveControllerCount}} > * {{OfflinePartitionsCount}} > * {{PreferredReplicaImbalanceCount}} > If the controller is currently holding the lock and a MBean request is > received, the thread executing the request will block until the controller > releases the lock. > We discovered this in a cluster where the controller was holding the lock for > extended periods of time for normal operations. We have documented this issue > in KAFKA-5116. > Several possible solutions exist: > * Remove the lock from inside these {{Gauge}}s. > * Store and update the metric values in {{AtomicLong}}s. > Modifying the {{ActiveControllerCount}} metric seems to be straight-forward > while the other 2 metrics seem to be more involved. > We're happy to contribute a patch, but wanted to discuss potential solutions > and their tradeoffs before proceeding. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (KAFKA-5107) remove preferred replica election state from ControllerContext
[ https://issues.apache.org/jira/browse/KAFKA-5107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman updated KAFKA-5107: Summary: remove preferred replica election state from ControllerContext (was: investigate removing preferred replica leader election state from ControllerContext) > remove preferred replica election state from ControllerContext > -- > > Key: KAFKA-5107 > URL: https://issues.apache.org/jira/browse/KAFKA-5107 > Project: Kafka > Issue Type: Sub-task >Reporter: Onur Karaman >Assignee: Onur Karaman > > KAFKA-5028 moves the controller to a single-threaded model, so we would no > longer have work interleaved between preferred replica leader election, > meaning we don't need to keep its state. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-5107) investigate removing preferred replica leader election state from ControllerContext
Onur Karaman created KAFKA-5107: --- Summary: investigate removing preferred replica leader election state from ControllerContext Key: KAFKA-5107 URL: https://issues.apache.org/jira/browse/KAFKA-5107 Project: Kafka Issue Type: Sub-task Reporter: Onur Karaman Assignee: Onur Karaman KAFKA-5028 moves the controller to a single-threaded model, so we would no longer have work interleaved between preferred replica leader election, meaning we don't need to keep its state. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-5099) Replica Deletion Regression from KIP-101
[ https://issues.apache.org/jira/browse/KAFKA-5099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977902#comment-15977902 ] Onur Karaman commented on KAFKA-5099: - My initial guess is that the new epoch checkpoint file isn't being moved to the soft-deleted directory correctly. > Replica Deletion Regression from KIP-101 > > > Key: KAFKA-5099 > URL: https://issues.apache.org/jira/browse/KAFKA-5099 > Project: Kafka > Issue Type: Bug >Reporter: Onur Karaman >Assignee: Onur Karaman > > It appears that replica deletion regressed from KIP-101. Replica deletion > happens when a broker receives a StopReplicaRequest with delete=true. Ever > since KAFKA-1911, replica deletion has been async, meaning the broker > responds with a StopReplicaResponse simply after marking the replica > directory as staged for deletion. This marking happens by moving a data log > directory and its contents such as /tmp/kafka-logs1/t1-0 to a marked > directory like /tmp/kafka-logs1/t1-0.8c9c4c0c61c44cc59ebeb00075a2a07f-delete, > acting as a soft-delete. A scheduled thread later actually deletes the data. > It appears that the regression occurs while the scheduled thread is actually > trying to delete the data, which means the controller considers operations > such as partition reassignment and topic deletion complete. But if you look > at the log4j logs and data logs, you'll find that the soft-deleted data logs > haven't actually won't get deleted. It seems that restarting the broker > actually allows for the soft-deleted directories to get deleted. > Here's the setup: > {code} > > ./bin/zookeeper-server-start.sh config/zookeeper.properties > > export LOG_DIR=logs0 && ./bin/kafka-server-start.sh > > config/server0.properties > > export LOG_DIR=logs1 && ./bin/kafka-server-start.sh > > config/server1.properties > > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t0 > > --replica-assignment 1:0 > > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t1 > > --replica-assignment 1:0 > > ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic t0 > > cat p.txt > {"partitions": > [ > {"topic": "t1", "partition": 0, "replicas": [0] } > ], > "version":1 > } > > ./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 > > --reassignment-json-file p.txt --execute > {code} > Here are sample logs: > {code} > [2017-04-20 17:46:54,801] INFO [ReplicaFetcherManager on broker 1] Removed > fetcher for partitions t0-0 (kafka.server.ReplicaFetcherManager) > [2017-04-20 17:46:54,814] INFO Log for partition t0-0 is renamed to > /tmp/kafka-logs1/t0-0.bbc8fa126e3e4ff787f6b68d158ab771-delete and is > scheduled for deletion (kafka.log.LogManager) > [2017-04-20 17:47:27,585] INFO Deleting index > /tmp/kafka-logs1/t0-0.bbc8fa126e3e4ff787f6b68d158ab771-delete/.index > (kafka.log.OffsetIndex) > [2017-04-20 17:47:27,586] INFO Deleting index > /tmp/kafka-logs1/t0-0/.timeindex (kafka.log.TimeIndex) > [2017-04-20 17:47:27,587] ERROR Exception in deleting > Log(/tmp/kafka-logs1/t0-0.bbc8fa126e3e4ff787f6b68d158ab771-delete). Moving it > to the end of the queue. (kafka.log.LogManager) > java.io.FileNotFoundException: > /tmp/kafka-logs1/t0-0/leader-epoch-checkpoint.tmp (No such file or directory) > at java.io.FileOutputStream.open0(Native Method) > at java.io.FileOutputStream.open(FileOutputStream.java:270) > at java.io.FileOutputStream.(FileOutputStream.java:213) > at java.io.FileOutputStream.(FileOutputStream.java:162) > at kafka.server.checkpoints.CheckpointFile.write(CheckpointFile.scala:41) > at > kafka.server.checkpoints.LeaderEpochCheckpointFile.write(LeaderEpochCheckpointFile.scala:61) > at > kafka.server.epoch.LeaderEpochFileCache.kafka$server$epoch$LeaderEpochFileCache$$flush(LeaderEpochFileCache.scala:178) > at > kafka.server.epoch.LeaderEpochFileCache$$anonfun$clear$1.apply$mcV$sp(LeaderEpochFileCache.scala:161) > at > kafka.server.epoch.LeaderEpochFileCache$$anonfun$clear$1.apply(LeaderEpochFileCache.scala:159) > at > kafka.server.epoch.LeaderEpochFileCache$$anonfun$clear$1.apply(LeaderEpochFileCache.scala:159) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213) > at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:221) > at > kafka.server.epoch.LeaderEpochFileCache.clear(LeaderEpochFileCache.scala:159) > at kafka.log.Log.delete(Log.scala:1051) > at > kafka.log.LogManager.kafka$log$LogManager$$deleteLogs(LogManager.scala:442) > at > kafka.log.LogManager$$anonfun$startup$5.apply$mcV$sp(LogManager.scala:241) > at > kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:57) > at
[jira] [Created] (KAFKA-5099) Replica Deletion Regression from KIP-101
Onur Karaman created KAFKA-5099: --- Summary: Replica Deletion Regression from KIP-101 Key: KAFKA-5099 URL: https://issues.apache.org/jira/browse/KAFKA-5099 Project: Kafka Issue Type: Bug Reporter: Onur Karaman Assignee: Onur Karaman It appears that replica deletion regressed from KIP-101. Replica deletion happens when a broker receives a StopReplicaRequest with delete=true. Ever since KAFKA-1911, replica deletion has been async, meaning the broker responds with a StopReplicaResponse simply after marking the replica directory as staged for deletion. This marking happens by moving a data log directory and its contents such as /tmp/kafka-logs1/t1-0 to a marked directory like /tmp/kafka-logs1/t1-0.8c9c4c0c61c44cc59ebeb00075a2a07f-delete, acting as a soft-delete. A scheduled thread later actually deletes the data. It appears that the regression occurs while the scheduled thread is actually trying to delete the data, which means the controller considers operations such as partition reassignment and topic deletion complete. But if you look at the log4j logs and data logs, you'll find that the soft-deleted data logs haven't actually won't get deleted. It seems that restarting the broker actually allows for the soft-deleted directories to get deleted. Here's the setup: {code} > ./bin/zookeeper-server-start.sh config/zookeeper.properties > export LOG_DIR=logs0 && ./bin/kafka-server-start.sh config/server0.properties > export LOG_DIR=logs1 && ./bin/kafka-server-start.sh config/server1.properties > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t0 > --replica-assignment 1:0 > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t1 > --replica-assignment 1:0 > ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic t0 > cat p.txt {"partitions": [ {"topic": "t1", "partition": 0, "replicas": [0] } ], "version":1 } > ./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 > --reassignment-json-file p.txt --execute {code} Here are sample logs: {code} [2017-04-20 17:46:54,801] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions t0-0 (kafka.server.ReplicaFetcherManager) [2017-04-20 17:46:54,814] INFO Log for partition t0-0 is renamed to /tmp/kafka-logs1/t0-0.bbc8fa126e3e4ff787f6b68d158ab771-delete and is scheduled for deletion (kafka.log.LogManager) [2017-04-20 17:47:27,585] INFO Deleting index /tmp/kafka-logs1/t0-0.bbc8fa126e3e4ff787f6b68d158ab771-delete/.index (kafka.log.OffsetIndex) [2017-04-20 17:47:27,586] INFO Deleting index /tmp/kafka-logs1/t0-0/.timeindex (kafka.log.TimeIndex) [2017-04-20 17:47:27,587] ERROR Exception in deleting Log(/tmp/kafka-logs1/t0-0.bbc8fa126e3e4ff787f6b68d158ab771-delete). Moving it to the end of the queue. (kafka.log.LogManager) java.io.FileNotFoundException: /tmp/kafka-logs1/t0-0/leader-epoch-checkpoint.tmp (No such file or directory) at java.io.FileOutputStream.open0(Native Method) at java.io.FileOutputStream.open(FileOutputStream.java:270) at java.io.FileOutputStream.(FileOutputStream.java:213) at java.io.FileOutputStream.(FileOutputStream.java:162) at kafka.server.checkpoints.CheckpointFile.write(CheckpointFile.scala:41) at kafka.server.checkpoints.LeaderEpochCheckpointFile.write(LeaderEpochCheckpointFile.scala:61) at kafka.server.epoch.LeaderEpochFileCache.kafka$server$epoch$LeaderEpochFileCache$$flush(LeaderEpochFileCache.scala:178) at kafka.server.epoch.LeaderEpochFileCache$$anonfun$clear$1.apply$mcV$sp(LeaderEpochFileCache.scala:161) at kafka.server.epoch.LeaderEpochFileCache$$anonfun$clear$1.apply(LeaderEpochFileCache.scala:159) at kafka.server.epoch.LeaderEpochFileCache$$anonfun$clear$1.apply(LeaderEpochFileCache.scala:159) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213) at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:221) at kafka.server.epoch.LeaderEpochFileCache.clear(LeaderEpochFileCache.scala:159) at kafka.log.Log.delete(Log.scala:1051) at kafka.log.LogManager.kafka$log$LogManager$$deleteLogs(LogManager.scala:442) at kafka.log.LogManager$$anonfun$startup$5.apply$mcV$sp(LogManager.scala:241) at kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110) at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:57) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at
[jira] [Created] (KAFKA-5083) always leave the last surviving member of the ISR in ZK
Onur Karaman created KAFKA-5083: --- Summary: always leave the last surviving member of the ISR in ZK Key: KAFKA-5083 URL: https://issues.apache.org/jira/browse/KAFKA-5083 Project: Kafka Issue Type: Sub-task Reporter: Onur Karaman Assignee: Onur Karaman Currently we erase ISR membership if the replica to be removed from the ISR is the last surviving member of the ISR and unclean leader election is enabled for the corresponding topic. We should investigate leaving the last replica in ISR in ZK, independent of whether unclean leader election is enabled or not. That way, if people re-disabled unclean leader election, we can still try to elect the leader from the last in-sync replica. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (KAFKA-5069) add controller integration tests
[ https://issues.apache.org/jira/browse/KAFKA-5069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman updated KAFKA-5069: Status: Patch Available (was: In Progress) > add controller integration tests > > > Key: KAFKA-5069 > URL: https://issues.apache.org/jira/browse/KAFKA-5069 > Project: Kafka > Issue Type: Sub-task >Reporter: Onur Karaman >Assignee: Onur Karaman > > Test the various controller protocols by observing zookeeper and broker state. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (KAFKA-5028) convert kafka controller to a single-threaded event queue model
[ https://issues.apache.org/jira/browse/KAFKA-5028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman updated KAFKA-5028: Status: Patch Available (was: In Progress) > convert kafka controller to a single-threaded event queue model > --- > > Key: KAFKA-5028 > URL: https://issues.apache.org/jira/browse/KAFKA-5028 > Project: Kafka > Issue Type: Sub-task >Reporter: Onur Karaman >Assignee: Onur Karaman > > The goal of this ticket is to improve controller maintainability by > simplifying the controller's concurrency semantics. The controller code has a > lot of shared state between several threads using several concurrency > primitives. This makes the code hard to reason about. > This ticket proposes we convert the controller to a single-threaded event > queue model. We add a new controller thread which processes events held in an > event queue. Note that this does not mean we get rid of all threads used by > the controller. We merely delegate all work that interacts with controller > local state to this single thread. With only a single thread accessing and > modifying the controller local state, we no longer need to worry about > concurrent access, which means we can get rid of the various concurrency > primitives used throughout the controller. > Performance is expected to match existing behavior since the bulk of the > existing controller work today already happens sequentially in the ZkClient’s > single ZkEventThread. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Work started] (KAFKA-5069) add controller integration tests
[ https://issues.apache.org/jira/browse/KAFKA-5069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-5069 started by Onur Karaman. --- > add controller integration tests > > > Key: KAFKA-5069 > URL: https://issues.apache.org/jira/browse/KAFKA-5069 > Project: Kafka > Issue Type: Sub-task >Reporter: Onur Karaman >Assignee: Onur Karaman > > Test the various controller protocols by observing zookeeper and broker state. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-5069) add controller integration tests
Onur Karaman created KAFKA-5069: --- Summary: add controller integration tests Key: KAFKA-5069 URL: https://issues.apache.org/jira/browse/KAFKA-5069 Project: Kafka Issue Type: Sub-task Reporter: Onur Karaman Assignee: Onur Karaman Test the various controller protocols by observing zookeeper and broker state. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Work started] (KAFKA-5028) convert kafka controller to a single-threaded event queue model
[ https://issues.apache.org/jira/browse/KAFKA-5028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-5028 started by Onur Karaman. --- > convert kafka controller to a single-threaded event queue model > --- > > Key: KAFKA-5028 > URL: https://issues.apache.org/jira/browse/KAFKA-5028 > Project: Kafka > Issue Type: Sub-task >Reporter: Onur Karaman >Assignee: Onur Karaman > > The goal of this ticket is to improve controller maintainability by > simplifying the controller's concurrency semantics. The controller code has a > lot of shared state between several threads using several concurrency > primitives. This makes the code hard to reason about. > This ticket proposes we convert the controller to a single-threaded event > queue model. We add a new controller thread which processes events held in an > event queue. Note that this does not mean we get rid of all threads used by > the controller. We merely delegate all work that interacts with controller > local state to this single thread. With only a single thread accessing and > modifying the controller local state, we no longer need to worry about > concurrent access, which means we can get rid of the various concurrency > primitives used throughout the controller. > Performance is expected to match existing behavior since the bulk of the > existing controller work today already happens sequentially in the ZkClient’s > single ZkEventThread. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (KAFKA-5028) convert kafka controller to a single-threaded event queue model
[ https://issues.apache.org/jira/browse/KAFKA-5028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman updated KAFKA-5028: Summary: convert kafka controller to a single-threaded event queue model (was: Convert Kafka Controller to a single-threaded event queue model) > convert kafka controller to a single-threaded event queue model > --- > > Key: KAFKA-5028 > URL: https://issues.apache.org/jira/browse/KAFKA-5028 > Project: Kafka > Issue Type: Sub-task >Reporter: Onur Karaman >Assignee: Onur Karaman > > The goal of this ticket is to improve controller maintainability by > simplifying the controller's concurrency semantics. The controller code has a > lot of shared state between several threads using several concurrency > primitives. This makes the code hard to reason about. > This ticket proposes we convert the controller to a single-threaded event > queue model. We add a new controller thread which processes events held in an > event queue. Note that this does not mean we get rid of all threads used by > the controller. We merely delegate all work that interacts with controller > local state to this single thread. With only a single thread accessing and > modifying the controller local state, we no longer need to worry about > concurrent access, which means we can get rid of the various concurrency > primitives used throughout the controller. > Performance is expected to match existing behavior since the bulk of the > existing controller work today already happens sequentially in the ZkClient’s > single ZkEventThread. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-5028) Convert Kafka Controller to a single-threaded event queue model
Onur Karaman created KAFKA-5028: --- Summary: Convert Kafka Controller to a single-threaded event queue model Key: KAFKA-5028 URL: https://issues.apache.org/jira/browse/KAFKA-5028 Project: Kafka Issue Type: Sub-task Reporter: Onur Karaman Assignee: Onur Karaman The goal of this ticket is to improve controller maintainability by simplifying the controller's concurrency semantics. The controller code has a lot of shared state between several threads using several concurrency primitives. This makes the code hard to reason about. This ticket proposes we convert the controller to a single-threaded event queue model. We add a new controller thread which processes events held in an event queue. Note that this does not mean we get rid of all threads used by the controller. We merely delegate all work that interacts with controller local state to this single thread. With only a single thread accessing and modifying the controller local state, we no longer need to worry about concurrent access, which means we can get rid of the various concurrency primitives used throughout the controller. Performance is expected to match existing behavior since the bulk of the existing controller work today already happens sequentially in the ZkClient’s single ZkEventThread. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-5027) Kafka Controller Redesign
Onur Karaman created KAFKA-5027: --- Summary: Kafka Controller Redesign Key: KAFKA-5027 URL: https://issues.apache.org/jira/browse/KAFKA-5027 Project: Kafka Issue Type: Improvement Reporter: Onur Karaman Assignee: Onur Karaman The goal of this redesign is to improve controller performance, controller maintainability, and cluster reliability. Documentation regarding what's being considered can be found [here|https://docs.google.com/document/d/1rLDmzDOGQQeSiMANP0rC2RYp_L7nUGHzFD9MQISgXYM]. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (KAFKA-4959) remove controller concurrent access to non-threadsafe NetworkClient, Selector, and SSLEngine
[ https://issues.apache.org/jira/browse/KAFKA-4959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman updated KAFKA-4959: Status: Patch Available (was: In Progress) > remove controller concurrent access to non-threadsafe NetworkClient, > Selector, and SSLEngine > > > Key: KAFKA-4959 > URL: https://issues.apache.org/jira/browse/KAFKA-4959 > Project: Kafka > Issue Type: Bug >Reporter: Onur Karaman >Assignee: Onur Karaman > > This brought down a cluster by causing continuous controller moves. > ZkClient's ZkEventThread and a RequestSendThread can concurrently use objects > that aren't thread-safe: > * Selector > * NetworkClient > * SSLEngine (this was the big one for us. We turn on SSL for interbroker > communication). > As per the "Concurrency Notes" section from the [SSLEngine > javadoc|https://docs.oracle.com/javase/7/docs/api/javax/net/ssl/SSLEngine.html]: > bq. two threads must not attempt to call the same method (either wrap() or > unwrap()) concurrently > SSLEngine.wrap gets called in: > * SslTransportLayer.write > * SslTransportLayer.handshake > * SslTransportLayer.close > It turns out that the ZkEventThread and RequestSendThread can concurrently > call SSLEngine.wrap: > * ZkEventThread calls SslTransportLayer.close from > ControllerChannelManager.removeExistingBroker > * RequestSendThread can call SslTransportLayer.write or > SslTransportLayer.handshake from NetworkClient.poll > Suppose the controller moves for whatever reason. The former controller could > have had a RequestSendThread who was in the middle of sending out messages to > the cluster while the ZkEventThread began executing > KafkaController.onControllerResignation, which calls > ControllerChannelManager.shutdown, which sequentially cleans up the > controller-to-broker queue and connection for every broker in the cluster. > This cleanup includes the call to > ControllerChannelManager.removeExistingBroker as mentioned earlier, causing > the concurrent call to SSLEngine.wrap. This concurrent call throws a > BufferOverflowException which ControllerChannelManager.removeExistingBroker > catches so the ControllerChannelManager.shutdown moves onto cleaning up the > next controller-to-broker queue and connection, skipping the cleanup steps > such as clearing the queue, stopping the RequestSendThread, and removing the > entry from its brokerStateInfo map. > By failing out of the Selector.close, the sensors corresponding to the broker > connection has not been cleaned up. Any later attempt at initializing an > identical Selector will result in a sensor collision and therefore cause > Selector initialization to throw an exception. In other words, any later > attempts by this broker to become controller again will fail on > initialization. When controller initialization fails, the controller deletes > the /controller znode and lets another broker take over. > Now suppose the controller moves enough times such that every broker hits the > BufferOverflowException concurrency issue. We're now guaranteed to fail > controller initialization due to the sensor collision on every controller > transition, so the controller will move across brokers continuously. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Work started] (KAFKA-4959) remove controller concurrent access to non-threadsafe NetworkClient, Selector, and SSLEngine
[ https://issues.apache.org/jira/browse/KAFKA-4959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-4959 started by Onur Karaman. --- > remove controller concurrent access to non-threadsafe NetworkClient, > Selector, and SSLEngine > > > Key: KAFKA-4959 > URL: https://issues.apache.org/jira/browse/KAFKA-4959 > Project: Kafka > Issue Type: Bug >Reporter: Onur Karaman >Assignee: Onur Karaman > > This brought down a cluster by causing continuous controller moves. > ZkClient's ZkEventThread and a RequestSendThread can concurrently use objects > that aren't thread-safe: > * Selector > * NetworkClient > * SSLEngine (this was the big one for us. We turn on SSL for interbroker > communication). > As per the "Concurrency Notes" section from the [SSLEngine > javadoc|https://docs.oracle.com/javase/7/docs/api/javax/net/ssl/SSLEngine.html]: > bq. two threads must not attempt to call the same method (either wrap() or > unwrap()) concurrently > SSLEngine.wrap gets called in: > * SslTransportLayer.write > * SslTransportLayer.handshake > * SslTransportLayer.close > It turns out that the ZkEventThread and RequestSendThread can concurrently > call SSLEngine.wrap: > * ZkEventThread calls SslTransportLayer.close from > ControllerChannelManager.removeExistingBroker > * RequestSendThread can call SslTransportLayer.write or > SslTransportLayer.handshake from NetworkClient.poll > Suppose the controller moves for whatever reason. The former controller could > have had a RequestSendThread who was in the middle of sending out messages to > the cluster while the ZkEventThread began executing > KafkaController.onControllerResignation, which calls > ControllerChannelManager.shutdown, which sequentially cleans up the > controller-to-broker queue and connection for every broker in the cluster. > This cleanup includes the call to > ControllerChannelManager.removeExistingBroker as mentioned earlier, causing > the concurrent call to SSLEngine.wrap. This concurrent call throws a > BufferOverflowException which ControllerChannelManager.removeExistingBroker > catches so the ControllerChannelManager.shutdown moves onto cleaning up the > next controller-to-broker queue and connection, skipping the cleanup steps > such as clearing the queue, stopping the RequestSendThread, and removing the > entry from its brokerStateInfo map. > By failing out of the Selector.close, the sensors corresponding to the broker > connection has not been cleaned up. Any later attempt at initializing an > identical Selector will result in a sensor collision and therefore cause > Selector initialization to throw an exception. In other words, any later > attempts by this broker to become controller again will fail on > initialization. When controller initialization fails, the controller deletes > the /controller znode and lets another broker take over. > Now suppose the controller moves enough times such that every broker hits the > BufferOverflowException concurrency issue. We're now guaranteed to fail > controller initialization due to the sensor collision on every controller > transition, so the controller will move across brokers continuously. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-4959) remove controller concurrent access to non-threadsafe NetworkClient, Selector, and SSLEngine
Onur Karaman created KAFKA-4959: --- Summary: remove controller concurrent access to non-threadsafe NetworkClient, Selector, and SSLEngine Key: KAFKA-4959 URL: https://issues.apache.org/jira/browse/KAFKA-4959 Project: Kafka Issue Type: Bug Reporter: Onur Karaman Assignee: Onur Karaman This brought down a cluster by causing continuous controller moves. ZkClient's ZkEventThread and a RequestSendThread can concurrently use objects that aren't thread-safe: * Selector * NetworkClient * SSLEngine (this was the big one for us. We turn on SSL for interbroker communication). As per the "Concurrency Notes" section from the [SSLEngine javadoc|https://docs.oracle.com/javase/7/docs/api/javax/net/ssl/SSLEngine.html]: bq. two threads must not attempt to call the same method (either wrap() or unwrap()) concurrently SSLEngine.wrap gets called in: * SslTransportLayer.write * SslTransportLayer.handshake * SslTransportLayer.close It turns out that the ZkEventThread and RequestSendThread can concurrently call SSLEngine.wrap: * ZkEventThread calls SslTransportLayer.close from ControllerChannelManager.removeExistingBroker * RequestSendThread can call SslTransportLayer.write or SslTransportLayer.handshake from NetworkClient.poll Suppose the controller moves for whatever reason. The former controller could have had a RequestSendThread who was in the middle of sending out messages to the cluster while the ZkEventThread began executing KafkaController.onControllerResignation, which calls ControllerChannelManager.shutdown, which sequentially cleans up the controller-to-broker queue and connection for every broker in the cluster. This cleanup includes the call to ControllerChannelManager.removeExistingBroker as mentioned earlier, causing the concurrent call to SSLEngine.wrap. This concurrent call throws a BufferOverflowException which ControllerChannelManager.removeExistingBroker catches so the ControllerChannelManager.shutdown moves onto cleaning up the next controller-to-broker queue and connection, skipping the cleanup steps such as clearing the queue, stopping the RequestSendThread, and removing the entry from its brokerStateInfo map. By failing out of the Selector.close, the sensors corresponding to the broker connection has not been cleaned up. Any later attempt at initializing an identical Selector will result in a sensor collision and therefore cause Selector initialization to throw an exception. In other words, any later attempts by this broker to become controller again will fail on initialization. When controller initialization fails, the controller deletes the /controller znode and lets another broker take over. Now suppose the controller moves enough times such that every broker hits the BufferOverflowException concurrency issue. We're now guaranteed to fail controller initialization due to the sensor collision on every controller transition, so the controller will move across brokers continuously. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4900) Brokers stuck in controller re-election loop after failing to register metrics
[ https://issues.apache.org/jira/browse/KAFKA-4900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943568#comment-15943568 ] Onur Karaman commented on KAFKA-4900: - [~nickt] Do you think it makes sense for me to open a new ticket specific to the BufferOverflowException concurrency issue and link all related underlying issues to this ticket? > Brokers stuck in controller re-election loop after failing to register metrics > -- > > Key: KAFKA-4900 > URL: https://issues.apache.org/jira/browse/KAFKA-4900 > Project: Kafka > Issue Type: Bug > Components: controller, core >Affects Versions: 0.10.1.1 >Reporter: Nick Travers > > We hit this today in one of out three node staging clusters. The exception > continues to occur on all three nodes. > {code} > 2017-03-15 02:17:30,677 ERROR > [ZkClient-EventThread-35-samsa-zkserver.stage.sjc1.square:26101/samsa] > server.ZookeeperLeaderElector - Error while electing or becoming leader on > broker 9 > java.lang.IllegalArgumentException: A metric named 'MetricName > [name=connection-close-rate, > group=controller-channel-metrics,description=Connections closed per second in > the window., tags={broker-id=10}]' already exists, can't register another one. > at > org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:380) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:179) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:164) > at > org.apache.kafka.common.network.Selector$SelectorMetrics.(Selector.java:617) > at org.apache.kafka.common.network.Selector.(Selector.java:138) > at > kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$addNewBroker(ControllerChannelManager.scala:101) > at > kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:45) > at > kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:45) > at scala.collection.immutable.Set$Set3.foreach(Set.scala:163) > at > kafka.controller.ControllerChannelManager.(ControllerChannelManager.scala:45) > at > kafka.controller.KafkaController.startChannelManager(KafkaController.scala:814) > at > kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:742) > at > kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:334) > at > kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:167) > at > kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:84) > at > kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:146) > at > kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141) > at > kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234) > at > kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:141) > at org.I0Itec.zkclient.ZkClient$9.run(ZkClient.java:824) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) > {code} > We observe a tight loop of controller (re-)election, i.e. one node hits this > exception, and leadership transitions to the next, which then hits the > exception, ad infinitum. > Producers and consumers appear to be connecting ok, and are able to produce > and consume messages. > Relevant data points: > - prior to this cluster restart, a partition reassignment was attempted for > a number of topics, which appeared to get stuck in the "in progress" state > (on the order of days) > - these topics were subsequently deleted > - a rolling restart of the cluster was performed was to turn on > broker-to-broker SSL communication > - the SSL change has subsequently been _rolled back_ after we observed these > exceptions > - the entire cluster was shut down, and nodes brought back one at a time in > an attempt to clear the exception. We were able to restart the cluster, but > we continue to see the exceptions > We also observed, during the same time as the exception above, the following > exception on all hosts: > {code} > 2017-03-15 01:44:04,572 ERROR > [ZkClient-EventThread-36-samsa-zkserver.stage.sjc1.square:26101/samsa] > controller.ReplicaStateMachine$BrokerChangeListener - [BrokerChangeListener > on Controller 10]: Error while handling broker changes > java.lang.ClassCastException: java.lang.String cannot be cast to >
[jira] [Commented] (KAFKA-4900) Brokers stuck in controller re-election loop after failing to register metrics
[ https://issues.apache.org/jira/browse/KAFKA-4900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941296#comment-15941296 ] Onur Karaman commented on KAFKA-4900: - I haven't figured out some of the small details but I think I figured out the gist of what happened at LinkedIn. One problem is that ZkClient's ZkEventThread and a RequestSendThread can concurrently use objects that aren't thread-safe: * Selector * NetworkClient * SSLEngine (this was the big one for us. We turn on SSL for interbroker communication). As per the "Concurrency Notes" section from the [SSLEngine javadoc|https://docs.oracle.com/javase/7/docs/api/javax/net/ssl/SSLEngine.html]: bq. two threads must not attempt to call the same method (either wrap() or unwrap()) concurrently SSLEngine.wrap gets called in: * SslTransportLayer.write * SslTransportLayer.handshake * SslTransportLayer.close It turns out that the ZkEventThread and RequestSendThread can concurrently call SSLEngine.wrap: * ZkEventThread calls SslTransportLayer.close from ControllerChannelManager.removeExistingBroker * RequestSendThread can call SslTransportLayer.write or SslTransportLayer.handshake from NetworkClient.poll Suppose the controller moves for whatever reason. The former controller could have had a RequestSendThread who was in the middle of sending out messages to the cluster while the ZkEventThread began executing KafkaController.onControllerResignation, which calls ControllerChannelManager.shutdown, which sequentially cleans up the controller-to-broker queue and connection for every broker in the cluster. This cleanup includes the call to ControllerChannelManager.removeExistingBroker as mentioned earlier, causing the concurrent call to SSLEngine.wrap. This concurrent call throws a BufferOverflowException which ControllerChannelManager.removeExistingBroker catches so the ControllerChannelManager.shutdown moves onto cleaning up the next controller-to-broker queue and connection, skipping the cleanup steps such as clearing the queue, stopping the RequestSendThread, and removing the entry from its brokerStateInfo map. By failing out of the Selector.close, the sensors corresponding to the broker connection has not been cleaned up. Any later attempt at initializing an identical Selector will result in a sensor collision and therefore cause Selector initialization to throw an exception. In other words, any later attempts by this broker to become controller again will fail on initialization. When controller initialization fails, the controller deletes the /controller znode and lets another broker take over. Now suppose the controller moves enough times such that every broker hits the BufferOverflowException concurrency issue. We're now guaranteed to fail controller initialization due to the sensor collision on every controller transition, so the controller will move across brokers continuously. The "connection-close-rate" is specifically cited because that's always the first sensor registered upon Selector initialization and therefore is the collision we see in the logs. Now the question is why did the controller move enough times to hit every broker in the cluster with the BufferOverflowException to begin with? Considering the BufferOverflowException on resignation issue alone, it's plausible that the cluster should stabilize after the first BufferOverflowException. We were running 0.10.1.1, where a controller can process events from ZkClient even after resigning as controller (fixed in 0.10.2.0 in KAFKA-4447). For instance, processing an isr change notification after resignation causes a NullPointerException while broadcasting an UpdateMetadataRequest to the cluster since the earlier controller resignation made the ControllerChannelManager null. This NullPointerException causes the controller to delete the /controller znode even though it's no longer the controller, kicking out the current active controller, potentially causing the same scenario to repeat. Every broker's logs had reported this NullPointerException from the IsrChangeNotificationListener, which explains how the controller moved around enough times such that every broker experienced the initial BufferOverflowException. > Brokers stuck in controller re-election loop after failing to register metrics > -- > > Key: KAFKA-4900 > URL: https://issues.apache.org/jira/browse/KAFKA-4900 > Project: Kafka > Issue Type: Bug > Components: controller, core >Affects Versions: 0.10.1.1 >Reporter: Nick Travers > > We hit this today in one of out three node staging clusters. The exception > continues to occur on all three nodes. > {code} > 2017-03-15 02:17:30,677 ERROR > [ZkClient-EventThread-35-samsa-zkserver.stage.sjc1.square:26101/samsa] >
[jira] [Commented] (KAFKA-4900) Brokers stuck in controller re-election loop after failing to register metrics
[ https://issues.apache.org/jira/browse/KAFKA-4900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929447#comment-15929447 ] Onur Karaman commented on KAFKA-4900: - Okay yeah with bad znode data, I could see how the controller would bounce around until the znode was cleaned. We didn't have a malformed znode so we're experiencing something else. I think I made some decent progress today. I'll make sure to report back what happened to us once it's all figured out. > Brokers stuck in controller re-election loop after failing to register metrics > -- > > Key: KAFKA-4900 > URL: https://issues.apache.org/jira/browse/KAFKA-4900 > Project: Kafka > Issue Type: Bug > Components: controller, core >Affects Versions: 0.10.1.1 >Reporter: Nick Travers > > We hit this today in one of out three node staging clusters. The exception > continues to occur on all three nodes. > {code} > 2017-03-15 02:17:30,677 ERROR > [ZkClient-EventThread-35-samsa-zkserver.stage.sjc1.square:26101/samsa] > server.ZookeeperLeaderElector - Error while electing or becoming leader on > broker 9 > java.lang.IllegalArgumentException: A metric named 'MetricName > [name=connection-close-rate, > group=controller-channel-metrics,description=Connections closed per second in > the window., tags={broker-id=10}]' already exists, can't register another one. > at > org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:380) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:179) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:164) > at > org.apache.kafka.common.network.Selector$SelectorMetrics.(Selector.java:617) > at org.apache.kafka.common.network.Selector.(Selector.java:138) > at > kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$addNewBroker(ControllerChannelManager.scala:101) > at > kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:45) > at > kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:45) > at scala.collection.immutable.Set$Set3.foreach(Set.scala:163) > at > kafka.controller.ControllerChannelManager.(ControllerChannelManager.scala:45) > at > kafka.controller.KafkaController.startChannelManager(KafkaController.scala:814) > at > kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:742) > at > kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:334) > at > kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:167) > at > kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:84) > at > kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:146) > at > kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141) > at > kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234) > at > kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:141) > at org.I0Itec.zkclient.ZkClient$9.run(ZkClient.java:824) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) > {code} > We observe a tight loop of controller (re-)election, i.e. one node hits this > exception, and leadership transitions to the next, which then hits the > exception, ad infinitum. > Producers and consumers appear to be connecting ok, and are able to produce > and consume messages. > Relevant data points: > - prior to this cluster restart, a partition reassignment was attempted for > a number of topics, which appeared to get stuck in the "in progress" state > (on the order of days) > - these topics were subsequently deleted > - a rolling restart of the cluster was performed was to turn on > broker-to-broker SSL communication > - the SSL change has subsequently been _rolled back_ after we observed these > exceptions > - the entire cluster was shut down, and nodes brought back one at a time in > an attempt to clear the exception. We were able to restart the cluster, but > we continue to see the exceptions > We also observed, during the same time as the exception above, the following > exception on all hosts: > {code} > 2017-03-15 01:44:04,572 ERROR > [ZkClient-EventThread-36-samsa-zkserver.stage.sjc1.square:26101/samsa] > controller.ReplicaStateMachine$BrokerChangeListener - [BrokerChangeListener > on
[jira] [Commented] (KAFKA-4893) async topic deletion conflicts with max topic length
[ https://issues.apache.org/jira/browse/KAFKA-4893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927298#comment-15927298 ] Onur Karaman commented on KAFKA-4893: - Also [~vahid] while I agree reducing the limit is by far the easiest, it definitely feels like we're moving the goalpost. We should instead try to maintain our guarantees as long as it's not exceedingly difficult. In this scenario, it doesn't feel exceedingly difficult. > async topic deletion conflicts with max topic length > > > Key: KAFKA-4893 > URL: https://issues.apache.org/jira/browse/KAFKA-4893 > Project: Kafka > Issue Type: Bug >Reporter: Onur Karaman >Assignee: Vahid Hashemian >Priority: Minor > > As per the > [documentation|http://kafka.apache.org/documentation/#basic_ops_add_topic], > topics can be only 249 characters long to line up with typical filesystem > limitations: > {quote} > Each sharded partition log is placed into its own folder under the Kafka log > directory. The name of such folders consists of the topic name, appended by a > dash (\-) and the partition id. Since a typical folder name can not be over > 255 characters long, there will be a limitation on the length of topic names. > We assume the number of partitions will not ever be above 100,000. Therefore, > topic names cannot be longer than 249 characters. This leaves just enough > room in the folder name for a dash and a potentially 5 digit long partition > id. > {quote} > {{kafka.common.Topic.maxNameLength}} is set to 249 and is used during > validation. > This limit ends up not being quite right since topic deletion ends up > renaming the directory to the form {{topic-partition.uniqueId-delete}} as can > be seen in {{LogManager.asyncDelete}}: > {code} > val dirName = new StringBuilder(removedLog.name) > .append(".") > > .append(java.util.UUID.randomUUID.toString.replaceAll("-","")) > .append(Log.DeleteDirSuffix) > .toString() > {code} > So the unique id and "-delete" suffix end up hogging some of the characters. > Deleting a long-named topic results in a log message such as the following: > {code} > kafka.common.KafkaStorageException: Failed to rename log directory from > /tmp/kafka-logs0/0-0 > to > /tmp/kafka-logs0/0-0.797bba3fb2464729840f87769243edbb-delete > at kafka.log.LogManager.asyncDelete(LogManager.scala:439) > at > kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:142) > at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137) > at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213) > at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:221) > at kafka.cluster.Partition.delete(Partition.scala:137) > at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:230) > at > kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:260) > at > kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:259) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:259) > at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:174) > at kafka.server.KafkaApis.handle(KafkaApis.scala:86) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:64) > at java.lang.Thread.run(Thread.java:745) > {code} > The topic after this point still exists but has Leader set to -1 and the > controller recognizes the topic completion as incomplete (the topic znode is > still in /admin/delete_topics). > I don't believe linkedin has any topic name this long but I'm making the > ticket in case anyone runs into this problem. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4900) Brokers stuck in controller re-election loop after failing to register metrics
[ https://issues.apache.org/jira/browse/KAFKA-4900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927200#comment-15927200 ] Onur Karaman commented on KAFKA-4900: - I hadn't yet investigated why some of the fetchers hadn't started up, but it makes me wonder if we should change the controller-broker interaction such that either: # the expected leadership/followership is periodically heartbeated from controller to broker # the current leadership/followership is periodically heartbeated from broker to controller > Brokers stuck in controller re-election loop after failing to register metrics > -- > > Key: KAFKA-4900 > URL: https://issues.apache.org/jira/browse/KAFKA-4900 > Project: Kafka > Issue Type: Bug > Components: controller, core >Affects Versions: 0.10.1.1 >Reporter: Nick Travers > > We hit this today in one of out three node staging clusters. The exception > continues to occur on all three nodes. > {code} > 2017-03-15 02:17:30,677 ERROR > [ZkClient-EventThread-35-samsa-zkserver.stage.sjc1.square:26101/samsa] > server.ZookeeperLeaderElector - Error while electing or becoming leader on > broker 9 > java.lang.IllegalArgumentException: A metric named 'MetricName > [name=connection-close-rate, > group=controller-channel-metrics,description=Connections closed per second in > the window., tags={broker-id=10}]' already exists, can't register another one. > at > org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:380) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:179) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:164) > at > org.apache.kafka.common.network.Selector$SelectorMetrics.(Selector.java:617) > at org.apache.kafka.common.network.Selector.(Selector.java:138) > at > kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$addNewBroker(ControllerChannelManager.scala:101) > at > kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:45) > at > kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:45) > at scala.collection.immutable.Set$Set3.foreach(Set.scala:163) > at > kafka.controller.ControllerChannelManager.(ControllerChannelManager.scala:45) > at > kafka.controller.KafkaController.startChannelManager(KafkaController.scala:814) > at > kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:742) > at > kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:334) > at > kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:167) > at > kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:84) > at > kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:146) > at > kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141) > at > kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234) > at > kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:141) > at org.I0Itec.zkclient.ZkClient$9.run(ZkClient.java:824) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) > {code} > We observe a tight loop of controller (re-)election, i.e. one node hits this > exception, and leadership transitions to the next, which then hits the > exception, ad infinitum. > Producers and consumers appear to be connecting ok, and are able to produce > and consume messages. > Relevant data points: > - prior to this cluster restart, a partition reassignment was attempted for > a number of topics, which appeared to get stuck in the "in progress" state > (on the order of days) > - these topics were subsequently deleted > - a rolling restart of the cluster was performed was to turn on > broker-to-broker SSL communication > - the SSL change has subsequently been _rolled back_ after we observed these > exceptions > - the entire cluster was shut down, and nodes brought back one at a time in > an attempt to clear the exception. We were able to restart the cluster, but > we continue to see the exceptions > We also observed, during the same time as the exception above, the following > exception on all hosts: > {code} > 2017-03-15 01:44:04,572 ERROR > [ZkClient-EventThread-36-samsa-zkserver.stage.sjc1.square:26101/samsa] >
[jira] [Comment Edited] (KAFKA-4893) async topic deletion conflicts with max topic length
[ https://issues.apache.org/jira/browse/KAFKA-4893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927155#comment-15927155 ] Onur Karaman edited comment on KAFKA-4893 at 3/15/17 11:23 PM: --- [~ijuma] I had thought about the additional fsync on the directory as well after posting the earlier comment. Would the following structure help? {code} kafkaLogDir | -- cleaner-offset-checkpoint | -- delete || -- 8fc621a18fe746d1b8eb4a7fa55a04bc | | -- .index | | -- .log | | -- .timeindex | | -- t-0 | -- meta.properties | -- recovery-point-offset-checkpoint | -- replication-offset-checkpoint {code} We'd put all the partitions staged for async deletion under kafkaLogDir/delete. Each directory corresponding to a partition staged for async deletion would have a new empty file with filename topic-partition so you can figure out later on what partition the data belonged to. The kafkaLogDir/delete directory would just be a trash bin directory that always exists, and we'd just delete all of the data inside the trash bin. was (Author: onurkaraman): [~ijuma] I had thought about the additional fsync on the directory as well after posting the earlier comment. Would the following structure help? {code} kafkaLogDir | -- cleaner-offset-checkpoint | -- delete || -- 8fc621a18fe746d1b8eb4a7fa55a04bc | | -- .index | | -- .log | | -- .timeindex | | -- t-0 | -- meta.properties | -- recovery-point-offset-checkpoint | -- replication-offset-checkpoint {code} The kafkaLogDir/delete directory would always be there. We'd put all the partitions staged for async deletion under kafkaLogDir/delete. Each directory corresponding to a partition staged for async deletion would have a new empty file with filename topic-partition so you can figure out later on what partition the data belonged to. > async topic deletion conflicts with max topic length > > > Key: KAFKA-4893 > URL: https://issues.apache.org/jira/browse/KAFKA-4893 > Project: Kafka > Issue Type: Bug >Reporter: Onur Karaman >Assignee: Vahid Hashemian >Priority: Minor > > As per the > [documentation|http://kafka.apache.org/documentation/#basic_ops_add_topic], > topics can be only 249 characters long to line up with typical filesystem > limitations: > {quote} > Each sharded partition log is placed into its own folder under the Kafka log > directory. The name of such folders consists of the topic name, appended by a > dash (\-) and the partition id. Since a typical folder name can not be over > 255 characters long, there will be a limitation on the length of topic names. > We assume the number of partitions will not ever be above 100,000. Therefore, > topic names cannot be longer than 249 characters. This leaves just enough > room in the folder name for a dash and a potentially 5 digit long partition > id. > {quote} > {{kafka.common.Topic.maxNameLength}} is set to 249 and is used during > validation. > This limit ends up not being quite right since topic deletion ends up > renaming the directory to the form {{topic-partition.uniqueId-delete}} as can > be seen in {{LogManager.asyncDelete}}: > {code} > val dirName = new StringBuilder(removedLog.name) > .append(".") > > .append(java.util.UUID.randomUUID.toString.replaceAll("-","")) > .append(Log.DeleteDirSuffix) > .toString() > {code} > So the unique id and "-delete" suffix end up hogging some of the characters. > Deleting a long-named topic results in a log message such as the following: > {code} > kafka.common.KafkaStorageException: Failed to rename log directory from > /tmp/kafka-logs0/0-0 > to > /tmp/kafka-logs0/0-0.797bba3fb2464729840f87769243edbb-delete > at kafka.log.LogManager.asyncDelete(LogManager.scala:439) > at > kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:142) > at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137) > at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213) > at
[jira] [Commented] (KAFKA-4893) async topic deletion conflicts with max topic length
[ https://issues.apache.org/jira/browse/KAFKA-4893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927155#comment-15927155 ] Onur Karaman commented on KAFKA-4893: - [~ijuma] I had thought about the additional fsync on the directory as well after posting the earlier comment. Would the following structure help? {code} kafkaLogDir | -- cleaner-offset-checkpoint | -- delete || -- 8fc621a18fe746d1b8eb4a7fa55a04bc | | -- .index | | -- .log | | -- .timeindex | | -- t-0 | -- meta.properties | -- recovery-point-offset-checkpoint | -- replication-offset-checkpoint {code} The kafkaLogDir/delete directory would always be there. We'd put all the partitions staged for async deletion under kafkaLogDir/delete. Each directory corresponding to a partition staged for async deletion would have a new empty file with filename topic-partition so you can figure out later on what partition the data belonged to. > async topic deletion conflicts with max topic length > > > Key: KAFKA-4893 > URL: https://issues.apache.org/jira/browse/KAFKA-4893 > Project: Kafka > Issue Type: Bug >Reporter: Onur Karaman >Assignee: Vahid Hashemian >Priority: Minor > > As per the > [documentation|http://kafka.apache.org/documentation/#basic_ops_add_topic], > topics can be only 249 characters long to line up with typical filesystem > limitations: > {quote} > Each sharded partition log is placed into its own folder under the Kafka log > directory. The name of such folders consists of the topic name, appended by a > dash (\-) and the partition id. Since a typical folder name can not be over > 255 characters long, there will be a limitation on the length of topic names. > We assume the number of partitions will not ever be above 100,000. Therefore, > topic names cannot be longer than 249 characters. This leaves just enough > room in the folder name for a dash and a potentially 5 digit long partition > id. > {quote} > {{kafka.common.Topic.maxNameLength}} is set to 249 and is used during > validation. > This limit ends up not being quite right since topic deletion ends up > renaming the directory to the form {{topic-partition.uniqueId-delete}} as can > be seen in {{LogManager.asyncDelete}}: > {code} > val dirName = new StringBuilder(removedLog.name) > .append(".") > > .append(java.util.UUID.randomUUID.toString.replaceAll("-","")) > .append(Log.DeleteDirSuffix) > .toString() > {code} > So the unique id and "-delete" suffix end up hogging some of the characters. > Deleting a long-named topic results in a log message such as the following: > {code} > kafka.common.KafkaStorageException: Failed to rename log directory from > /tmp/kafka-logs0/0-0 > to > /tmp/kafka-logs0/0-0.797bba3fb2464729840f87769243edbb-delete > at kafka.log.LogManager.asyncDelete(LogManager.scala:439) > at > kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:142) > at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137) > at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213) > at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:221) > at kafka.cluster.Partition.delete(Partition.scala:137) > at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:230) > at > kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:260) > at > kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:259) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:259) > at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:174) > at kafka.server.KafkaApis.handle(KafkaApis.scala:86) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:64) > at java.lang.Thread.run(Thread.java:745) > {code} > The topic after this point still
[jira] [Comment Edited] (KAFKA-4900) Brokers stuck in controller re-election loop after failing to register metrics
[ https://issues.apache.org/jira/browse/KAFKA-4900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925611#comment-15925611 ] Onur Karaman edited comment on KAFKA-4900 at 3/15/17 6:44 AM: -- For reference, I think the controller was changing every 3 seconds or so during the re-election loop. We were able to stabilize the cluster after about two hours but haven't yet figured out the root cause. We didn't know the cause, so we definitely didn't take the optimal sequence of steps to stabilize the cluster. Anyways, here's what we did: # We figured deleting the /controller znode wouldn't make a difference since it was anyways changing every 3 seconds. # We initially tried bouncing what we had incorrectly thought was the sole broker corresponding to the metric collision, but this didn't help. The re-election loop continued, and we later found out in the logs that there had been "connection-close-rate" metrics collisions tagged with other brokers. # We then shifted traffic out of the bad cluster, killed all of the brokers, and restarted them all. Getting the cluster back up and recovering its unflushed segments took around 25 minutes. # While the re-election loop had stopped, we still saw under-replicated partitions after the cluster was back up and later found that some of the fetchers haven't been started. I think at this point we just deleted the /controller znode to elect a new controller with the hopes of it broadcasting leadership/followership of partitions to the cluster so that the broker's fetchers can start up correctly. After this, the under-replicated partitions came back down to approximately zero. We think the few under-replicated partitions here and there are from uneven partition leadership distribution but this wasn't a big deal so we haven't attempted to fix the few remaining under-replicated partitions yet. was (Author: onurkaraman): For reference, I think the controller was changing every 3 seconds or so during the re-election loop. We were able to stabilize the cluster after about two hours but haven't yet figured out the root cause. We didn't know the cause, so we definitely didn't take the optimal sequence of steps to stabilize the cluster. Anyways, here's what we did: # We figured deleting the /controller znode wouldn't make a difference since it was anyways changing every 3 seconds. # We initially tried bouncing what we had incorrectly thought was the sole broker corresponding to the metric collision, but this didn't help. The re-election loop continued, and we later found out in the logs that there had been "connection-close-rate" metrics collisions tagged with other brokers. # We then shifted traffic out of the bad cluster, killed all of the brokers, and restarted them all. Getting the cluster back up and recovering its unflushed segments took around 25 minutes. # We still saw under-replicated partitions after the cluster was back up and later found that some of the fetchers haven't been started. I think at this point we just deleted the /controller znode to elect a new controller with the hopes of it broadcasting leadership/followership of partitions to the cluster so that the broker's fetchers can start up correctly. After this, the under-replicated partitions came back down to approximately zero. We think the few under-replicated partitions here and there are from uneven partition leadership distribution but this wasn't a big deal so we haven't attempted to fix the few remaining under-replicated partitions yet. > Brokers stuck in controller re-election loop after failing to register metrics > -- > > Key: KAFKA-4900 > URL: https://issues.apache.org/jira/browse/KAFKA-4900 > Project: Kafka > Issue Type: Bug > Components: controller, core >Affects Versions: 0.10.1.1 >Reporter: Nick Travers > > We hit this today in one of out three node staging clusters. The exception > continues to occur on all three nodes. > {code} > 2017-03-15 02:17:30,677 ERROR > [ZkClient-EventThread-35-samsa-zkserver.stage.sjc1.square:26101/samsa] > server.ZookeeperLeaderElector - Error while electing or becoming leader on > broker 9 > java.lang.IllegalArgumentException: A metric named 'MetricName > [name=connection-close-rate, > group=controller-channel-metrics,description=Connections closed per second in > the window., tags={broker-id=10}]' already exists, can't register another one. > at > org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:380) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:179) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:164) > at > org.apache.kafka.common.network.Selector$SelectorMetrics.(Selector.java:617) > at
[jira] [Commented] (KAFKA-4900) Brokers stuck in controller re-election loop after failing to register metrics
[ https://issues.apache.org/jira/browse/KAFKA-4900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925611#comment-15925611 ] Onur Karaman commented on KAFKA-4900: - For reference, I think the controller was changing every 3 seconds or so during the re-election loop. We were able to stabilize the cluster after about two hours but haven't yet figured out the root cause. We didn't know the cause, so we definitely didn't take the optimal sequence of steps to stabilize the cluster. Anyways, here's what we did: # We figured deleting the /controller znode wouldn't make a difference since it was anyways changing every 3 seconds. # We initially tried bouncing what we had incorrectly thought was the sole broker corresponding to the metric collision, but this didn't help. The re-election loop continued, and we later found out in the logs that there had been "connection-close-rate" metrics collisions tagged with other brokers. # We then shifted traffic out of the bad cluster, killed all of the brokers, and restarted them all. Getting the cluster back up and recovering its unflushed segments took around 25 minutes. # We still saw under-replicated partitions after the cluster was back up and later found that some of the fetchers haven't been started. I think at this point we just deleted the /controller znode to elect a new controller with the hopes of it broadcasting leadership/followership of partitions to the cluster so that the broker's fetchers can start up correctly. After this, the under-replicated partitions came back down to approximately zero. We think the few under-replicated partitions here and there are from uneven partition leadership distribution but this wasn't a big deal so we haven't attempted to fix the few remaining under-replicated partitions yet. > Brokers stuck in controller re-election loop after failing to register metrics > -- > > Key: KAFKA-4900 > URL: https://issues.apache.org/jira/browse/KAFKA-4900 > Project: Kafka > Issue Type: Bug > Components: controller, core >Affects Versions: 0.10.1.1 >Reporter: Nick Travers > > We hit this today in one of out three node staging clusters. The exception > continues to occur on all three nodes. > {code} > 2017-03-15 02:17:30,677 ERROR > [ZkClient-EventThread-35-samsa-zkserver.stage.sjc1.square:26101/samsa] > server.ZookeeperLeaderElector - Error while electing or becoming leader on > broker 9 > java.lang.IllegalArgumentException: A metric named 'MetricName > [name=connection-close-rate, > group=controller-channel-metrics,description=Connections closed per second in > the window., tags={broker-id=10}]' already exists, can't register another one. > at > org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:380) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:179) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:164) > at > org.apache.kafka.common.network.Selector$SelectorMetrics.(Selector.java:617) > at org.apache.kafka.common.network.Selector.(Selector.java:138) > at > kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$addNewBroker(ControllerChannelManager.scala:101) > at > kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:45) > at > kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:45) > at scala.collection.immutable.Set$Set3.foreach(Set.scala:163) > at > kafka.controller.ControllerChannelManager.(ControllerChannelManager.scala:45) > at > kafka.controller.KafkaController.startChannelManager(KafkaController.scala:814) > at > kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:742) > at > kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:334) > at > kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:167) > at > kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:84) > at > kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:146) > at > kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141) > at > kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234) > at > kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:141) > at
[jira] [Commented] (KAFKA-4900) Brokers stuck in controller re-election loop after failing to register metrics
[ https://issues.apache.org/jira/browse/KAFKA-4900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925503#comment-15925503 ] Onur Karaman commented on KAFKA-4900: - We hit the same re-election loop issue from ZookeeperLeaderElector throwing java.lang.IllegalArgumentException due to attempting to register "connection-close-rate" in production at linkedin today. A similar issue of duplicate metric registration came up recently in KAFKA-4811 as well. Strange. I don't think we saw the ClassCastException. > Brokers stuck in controller re-election loop after failing to register metrics > -- > > Key: KAFKA-4900 > URL: https://issues.apache.org/jira/browse/KAFKA-4900 > Project: Kafka > Issue Type: Bug > Components: controller, core >Affects Versions: 0.10.1.1 >Reporter: Nick Travers > > We hit this today in one of out three node staging clusters. The exception > continues to occur on all three nodes. > {code} > 2017-03-15 02:17:30,677 ERROR > [ZkClient-EventThread-35-samsa-zkserver.stage.sjc1.square:26101/samsa] > server.ZookeeperLeaderElector - Error while electing or becoming leader on > broker 9 > java.lang.IllegalArgumentException: A metric named 'MetricName > [name=connection-close-rate, > group=controller-channel-metrics,description=Connections closed per second in > the window., tags={broker-id=10}]' already exists, can't register another one. > at > org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:380) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:179) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:164) > at > org.apache.kafka.common.network.Selector$SelectorMetrics.(Selector.java:617) > at org.apache.kafka.common.network.Selector.(Selector.java:138) > at > kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$addNewBroker(ControllerChannelManager.scala:101) > at > kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:45) > at > kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:45) > at scala.collection.immutable.Set$Set3.foreach(Set.scala:163) > at > kafka.controller.ControllerChannelManager.(ControllerChannelManager.scala:45) > at > kafka.controller.KafkaController.startChannelManager(KafkaController.scala:814) > at > kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:742) > at > kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:334) > at > kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:167) > at > kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:84) > at > kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:146) > at > kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141) > at > kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234) > at > kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:141) > at org.I0Itec.zkclient.ZkClient$9.run(ZkClient.java:824) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) > {code} > We observe a tight loop of controller (re-)election, i.e. one nodes hits this > exception, and leadership transitions to the next, which then hits the > exception, ad infinitum. > Producers and consumers appear to be connecting ok, and are able to produce > and consume messages. > Relevant data points: > - prior to this cluster restart, a repartition of a number of topics was > attempted, which appeared to get stuck in the "in progress" state (on the > order of days) > - these topics were subsequently deleted > - a rolling restart of the cluster was performed was to turn on > broker-to-broker SSL communication > - the SSL change has subsequently been _rolled back_ after we observed these > exceptions > - the entire cluster was shut down, and nodes brought back one at a time in > an attempt to clear the exception. We were able to restart the cluster, but > we continue to see the exceptions > We also observed, during the same time as the exception above, the following > exception on all hosts: > {code} > 2017-03-15 01:44:04,572 ERROR > [ZkClient-EventThread-36-samsa-zkserver.stage.sjc1.square:26101/samsa] > controller.ReplicaStateMachine$BrokerChangeListener -
[jira] [Commented] (KAFKA-4896) Offset loading can use more threads
[ https://issues.apache.org/jira/browse/KAFKA-4896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15924764#comment-15924764 ] Onur Karaman commented on KAFKA-4896: - [~junrao] Naively bumping the number of scheduler threads might cause a concurrency bug if GroupCoordinator.handleGroupImmigration and GroupCoordinator.handleGroupEmigration are called concurrently. handleGroupImmigration schedules a task (doLoadGroupsAndOffsets) that acquires the partitionLock, updates the loadingPartitions, releases the partitionLock, loads the metadata into memory, acquires the partitionLock, and updates ownedPartitions and loadingPartitions, and then releases the partitionLock. handleGroupEmigration schedules a task (removeGroupsAndOffsets) that acquires the partitionLock, updates ownedPartitions, clears the metadata from memory, and then releases the partitionLock. Adding more threads to the scheduler risks removeGroupsAndOffsets concurrently clearing metadata while doLoadGroupsAndOffsets is loading metadata by executing between doLoadGroupsAndOffsets's two windows of holding the partitionLock. > Offset loading can use more threads > --- > > Key: KAFKA-4896 > URL: https://issues.apache.org/jira/browse/KAFKA-4896 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 0.10.2.0 >Reporter: Jun Rao > Labels: newbie > > Currently, in GroupMetadataManager, we have a single thread for loading the > offset cache. We could speed it up with more threads. > /* single-thread scheduler to handle offset/group metadata cache loading and > unloading */ > private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = > "group-metadata-manager-") -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4893) async topic deletion conflicts with max topic length
[ https://issues.apache.org/jira/browse/KAFKA-4893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15923636#comment-15923636 ] Onur Karaman commented on KAFKA-4893: - Maybe the right move is to just add more layers to the log directory like so: {code} kafkaLogDir/delete/topic-partition/uniqueId/ {code} and put all the data under that directory. I think typical filesystems have a directory limit of 4096 characters so we should be good to go. The one annoyance is that the old structure is already out in the wild, so you may need to recognize and handle both delete directory structures for some time. > async topic deletion conflicts with max topic length > > > Key: KAFKA-4893 > URL: https://issues.apache.org/jira/browse/KAFKA-4893 > Project: Kafka > Issue Type: Bug >Reporter: Onur Karaman >Assignee: Vahid Hashemian >Priority: Minor > > As per the > [documentation|http://kafka.apache.org/documentation/#basic_ops_add_topic], > topics can be only 249 characters long to line up with typical filesystem > limitations: > {quote} > Each sharded partition log is placed into its own folder under the Kafka log > directory. The name of such folders consists of the topic name, appended by a > dash (\-) and the partition id. Since a typical folder name can not be over > 255 characters long, there will be a limitation on the length of topic names. > We assume the number of partitions will not ever be above 100,000. Therefore, > topic names cannot be longer than 249 characters. This leaves just enough > room in the folder name for a dash and a potentially 5 digit long partition > id. > {quote} > {{kafka.common.Topic.maxNameLength}} is set to 249 and is used during > validation. > This limit ends up not being quite right since topic deletion ends up > renaming the directory to the form {{topic-partition.uniqueId-delete}} as can > be seen in {{LogManager.asyncDelete}}: > {code} > val dirName = new StringBuilder(removedLog.name) > .append(".") > > .append(java.util.UUID.randomUUID.toString.replaceAll("-","")) > .append(Log.DeleteDirSuffix) > .toString() > {code} > So the unique id and "-delete" suffix end up hogging some of the characters. > Deleting a long-named topic results in a log message such as the following: > {code} > kafka.common.KafkaStorageException: Failed to rename log directory from > /tmp/kafka-logs0/0-0 > to > /tmp/kafka-logs0/0-0.797bba3fb2464729840f87769243edbb-delete > at kafka.log.LogManager.asyncDelete(LogManager.scala:439) > at > kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:142) > at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137) > at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213) > at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:221) > at kafka.cluster.Partition.delete(Partition.scala:137) > at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:230) > at > kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:260) > at > kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:259) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:259) > at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:174) > at kafka.server.KafkaApis.handle(KafkaApis.scala:86) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:64) > at java.lang.Thread.run(Thread.java:745) > {code} > The topic after this point still exists but has Leader set to -1 and the > controller recognizes the topic completion as incomplete (the topic znode is > still in /admin/delete_topics). > I don't believe linkedin has any topic name this long but I'm making the > ticket in case anyone runs into this problem. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-4893) async topic deletion conflicts with max topic length
Onur Karaman created KAFKA-4893: --- Summary: async topic deletion conflicts with max topic length Key: KAFKA-4893 URL: https://issues.apache.org/jira/browse/KAFKA-4893 Project: Kafka Issue Type: Bug Reporter: Onur Karaman Priority: Minor As per the [documentation|http://kafka.apache.org/documentation/#basic_ops_add_topic], topics can be only 249 characters long to line up with typical filesystem limitations: {quote} Each sharded partition log is placed into its own folder under the Kafka log directory. The name of such folders consists of the topic name, appended by a dash (\-) and the partition id. Since a typical folder name can not be over 255 characters long, there will be a limitation on the length of topic names. We assume the number of partitions will not ever be above 100,000. Therefore, topic names cannot be longer than 249 characters. This leaves just enough room in the folder name for a dash and a potentially 5 digit long partition id. {quote} {{kafka.common.Topic.maxNameLength}} is set to 249 and is used during validation. This limit ends up not being quite right since topic deletion ends up renaming the directory to the form {{topic-partition.uniqueId-delete}} as can be seen in {{LogManager.asyncDelete}}: {code} val dirName = new StringBuilder(removedLog.name) .append(".") .append(java.util.UUID.randomUUID.toString.replaceAll("-","")) .append(Log.DeleteDirSuffix) .toString() {code} So the unique id and "-delete" suffix end up hogging some of the characters. Deleting a long-named topic results in a log message such as the following: {code} kafka.common.KafkaStorageException: Failed to rename log directory from /tmp/kafka-logs0/0-0 to /tmp/kafka-logs0/0-0.797bba3fb2464729840f87769243edbb-delete at kafka.log.LogManager.asyncDelete(LogManager.scala:439) at kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:142) at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137) at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213) at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:221) at kafka.cluster.Partition.delete(Partition.scala:137) at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:230) at kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:260) at kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:259) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:259) at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:174) at kafka.server.KafkaApis.handle(KafkaApis.scala:86) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:64) at java.lang.Thread.run(Thread.java:745) {code} The topic after this point still exists but has Leader set to -1 and the controller recognizes the topic completion as incomplete (the topic znode is still in /admin/delete_topics). I don't believe linkedin has any topic name this long but I'm making the ticket in case anyone runs into this problem. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-4891) kafka.request.logger TRACE regression
Onur Karaman created KAFKA-4891: --- Summary: kafka.request.logger TRACE regression Key: KAFKA-4891 URL: https://issues.apache.org/jira/browse/KAFKA-4891 Project: Kafka Issue Type: Bug Reporter: Onur Karaman Assignee: Onur Karaman Here's what kafka-request.log shows with {{kafka.request.logger}} set to TRACE: {code} [2017-03-13 10:06:24,402] TRACE Completed request:org.apache.kafka.common.requests.RequestHeader@2f99ef87 -- org.apache.kafka.common.requests.LeaderAndIsrRequest@34c40fc5 from connection 127.0.0.1:9090-127.0.0.1:50969;totalTime:125,requestQueueTime:0,localTime:124,remoteTime:0,responseQueueTime:0,sendTime:1,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT (kafka.request.logger) [2017-03-13 10:06:24,406] TRACE Completed request:org.apache.kafka.common.requests.RequestHeader@133b1de8 -- org.apache.kafka.common.requests.UpdateMetadataRequest@795826d from connection 127.0.0.1:9090-127.0.0.1:50969;totalTime:3,requestQueueTime:0,localTime:2,remoteTime:0,responseQueueTime:0,sendTime:1,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT (kafka.request.logger) [2017-03-13 10:06:24,943] TRACE Completed request:org.apache.kafka.common.requests.RequestHeader@76a9acaa -- org.apache.kafka.common.requests.FetchRequest@33ab3c1b from connection 127.0.0.1:9090-127.0.0.1:50976;totalTime:522,requestQueueTime:0,localTime:13,remoteTime:506,responseQueueTime:0,sendTime:1,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT (kafka.request.logger) {code} Both the headers and requests have regressed to just show object ids instead of their contents from their underlying structs. I'm guessing this regression came from commit [fc1cfe475e8ae8458d8ddf119ce18d0c64653a70|https://github.com/apache/kafka/commit/fc1cfe475e8ae8458d8ddf119ce18d0c64653a70] The logs should look something like this: {code} [2017-03-13 10:14:36,754] TRACE Completed request:{api_key=4,api_version=0,correlation_id=2,client_id=0} -- {controller_id=0,controller_epoch=1,partition_states=[{topic=t,partition=5,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=4,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=3,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=2,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=1,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=0,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=7,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=6,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]}],live_leaders=[{id=0,host=localhost,port=9090},{id=1,host=localhost,port=9091}]} from connection 127.0.0.1:9090-127.0.0.1:51349;totalTime:155,requestQueueTime:0,localTime:155,remoteTime:0,responseQueueTime:0,sendTime:0,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT (kafka.request.logger) [2017-03-13 10:14:36,758] TRACE Completed request:{api_key=6,api_version=3,correlation_id=3,client_id=0} -- {controller_id=0,controller_epoch=1,partition_states=[{topic=t,partition=5,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=4,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=3,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=2,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=1,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=0,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=7,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=6,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]}],live_brokers=[{id=1,end_points=[{port=9091,host=localhost,listener_name=PLAINTEXT,security_protocol_type=0}],rack=null},{id=0,end_points=[{port=9090,host=localhost,listener_name=PLAINTEXT,security_protocol_type=0}],rack=null}]} from connection 127.0.0.1:9090-127.0.0.1:51349;totalTime:3,requestQueueTime:1,localTime:2,remoteTime:0,responseQueueTime:0,sendTime:0,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT (kafka.request.logger) [2017-03-13 10:14:37,297] TRACE Completed request:{api_key=1,api_version=3,correlation_id=0,client_id=ReplicaFetcherThread-0-0} --
[jira] [Commented] (KAFKA-1895) Investigate moving deserialization and decompression out of KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-1895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15879561#comment-15879561 ] Onur Karaman commented on KAFKA-1895: - I think it's worth defining the relation between the two problems mentioned earlier: # no means of access to raw FetchResponse data # lack of a separate IO thread I think Problem 1 is more of a performance problem while Problem 2 is a performance and usability problem (KAFKA-4753 shows that this is can lead to starvation). Addressing Problem 1 doesn't solve Problem 2. Addressing Problem 2 partially solves Problem 1. With a solution to Problem 2, we have the potential to also do the decompression/deserialization in the separate IO thread, removing decompression-in-user-thread performance concerns. But this wouldn't address the decompression-then-recompression performance concerns in MirrorMaker or perhaps some stream processing use-cases. I think we need to solve both problems. > Investigate moving deserialization and decompression out of KafkaConsumer > - > > Key: KAFKA-1895 > URL: https://issues.apache.org/jira/browse/KAFKA-1895 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: Jay Kreps > > The consumer implementation in KAFKA-1760 decompresses fetch responses and > deserializes them into ConsumerRecords which are then handed back as the > result of poll(). > There are several downsides to this: > 1. It is impossible to scale serialization and decompression work beyond the > single thread running the KafkaConsumer. > 2. The results can come back during the processing of other calls such as > commit() etc which can result in caching these records a little longer. > An alternative would be to have ConsumerRecords wrap the actual compressed > serialized MemoryRecords chunks and do the deserialization during iteration. > This way you could scale this over a thread pool if needed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4753) KafkaConsumer susceptible to FetchResponse starvation
[ https://issues.apache.org/jira/browse/KAFKA-4753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15879558#comment-15879558 ] Onur Karaman commented on KAFKA-4753: - [~ijuma] I don't think mitigation is the way to go. The two solutions I can think of are to either: 1. introduce some sort of fair scheduling logic that polls against all brokers but only returns records to the user by waiting in a round-robin manner for a completed FetchResponse from a given broker. This would damage performance since you're blocking the processing of fully formed FetchResponses on the slowest fetch. 2. do all IO in a separate thread. I think we've been gravitating towards this since the beginning. - January 2015: In KAFKA-1760, KafkaConsumer implemented as being entirely single-threaded with user-driven fetches and heartbeats. - July 2015: I sent out an [email|https://www.mail-archive.com/dev@kafka.apache.org/msg31447.html] stating concerns with this approach, including the user-driven fetches and heartbeats. Becket and I proposed a [rough design for the fix|https://cwiki.apache.org/confluence/display/KAFKA/New+consumer+API+change+proposal] back then. Some nice API tweaks were made as a result of the discussion but the user-driven fetches and heartbeats remained. - October 2015: In KAFKA-2397 I added LeaveGroupRequest to reduce the turnaround from a controlled client shutdown and rebalancing the group. - February 2016: In KIP-62 (KAFKA-3888) we added "max.poll.records" in KIP-41 to mitigate the impact of record processing on the user-driven heartbeats. - August 2016: We added a separate heartbeat thread, synchronization logic surrounding NetworkClient, and retained poll-based liveness by automatically sending out LeaveGroupRequest when "max.poll.interval.ms" wasn't honored. - February 2017: Some of the simplest uses of KafkaConsumer with manual partition assignment suffers from starvation again because of the user-driven fetch design decision. I think we should try to make solution 2 work. We can retain the poll-based liveness by continuing what we do today: automatic sending of LeaveGroupRequest when "max.poll.interval.ms" isn't honored. This fix could also help KAFKA-1895 as well, as we can potentially push decompression/deserialization in the separate IO thread. > KafkaConsumer susceptible to FetchResponse starvation > - > > Key: KAFKA-4753 > URL: https://issues.apache.org/jira/browse/KAFKA-4753 > Project: Kafka > Issue Type: Bug >Reporter: Onur Karaman >Assignee: Onur Karaman > > FetchResponse starvation here means that the KafkaConsumer repeatedly fails > to fully form FetchResponses within the request timeout from a subset of the > brokers its fetching from while FetchResponses from the other brokers can get > fully formed and processed by the application. > In other words, this ticket is concerned with scenarios where fetching from > some brokers hurts the progress of fetching from other brokers to the point > of repeatedly hitting a request timeout. > Some FetchResponse starvation scenarios: > 1. partition leadership of the consumer's assigned partitions is skewed > across brokers, causing uneven FetchResponse sizes across brokers. > 2. the consumer seeks back on partitions on some brokers but not others, > causing uneven FetchResponse sizes across brokers. > 3. the consumer's ability to keep up with various partitions across brokers > is skewed, causing uneven FetchResponse sizes across brokers. > I've personally seen scenario 1 happen this past week to one of our users in > prod. They manually assigned partitions such that a few brokers led most of > the partitions while other brokers only led a single partition. When > NetworkClient sends out FetchRequests to different brokers in parallel with > an uneven partition distribution, FetchResponses from brokers who lead more > partitions will contain more data than FetchResponses from brokers who lead > few partitions. This means the small FetchResponses will get fully formed > quicker than larger FetchResponses. When the application eventually consumes > a smaller fully formed FetchResponses, the NetworkClient will send out a new > FetchRequest to the lightly-loaded broker. Their response will again come > back quickly while only marginal progress has been made on the larger > FetchResponse. Repeat this process several times and your application will > have potentially processed many smaller FetchResponses while the larger > FetchResponse made minimal progress and is forced to timeout, causing the > large FetchResponse to start all over again, which causes starvation. > To mitigate the problem for the short term, I've suggested to our user that > they either: > 1. bump up their "receive.buffer.bytes" beyond the current
[jira] [Commented] (KAFKA-4757) Improve NetworkClient trace logging of request details
[ https://issues.apache.org/jira/browse/KAFKA-4757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15872616#comment-15872616 ] Onur Karaman commented on KAFKA-4757: - Left some comments on the PR. Speaking of which, usually the PR itself should appear on the jira ticket. I wonder if it's because the PR title isn't in the KAFKA-WXYZ:title format as defined here: https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes > Improve NetworkClient trace logging of request details > -- > > Key: KAFKA-4757 > URL: https://issues.apache.org/jira/browse/KAFKA-4757 > Project: Kafka > Issue Type: Bug >Reporter: Onur Karaman >Assignee: Colin P. McCabe > > Two issues here: > 1. Here's what NetworkClient now shows when processing a disconnection: > {code} > [2017-02-10 10:48:57,052] TRACE Cancelled request > org.apache.kafka.clients.NetworkClient$InFlightRequest@52f759d7 due to node 0 > being disconnected (org.apache.kafka.clients.NetworkClient) > {code} > The log at one point was useful and actually showed the contents of the > request. For instance, with FetchRequest, you used to be able to see which > partitions were requested as well as the offsets and max bytes requested per > partition. > It looks like InFlightRequest itself doesn't actually hold the request but > instead currently just holds the RequestHeader. We probably want to make > InFlightRequest hold the entire request to make the original request show up > in the logs. > 2. Sometimes I see the following log: > {code} > [2017-02-10 10:53:59,015] TRACE Sending {} to node 0. > (org.apache.kafka.clients.NetworkClient) > {code} > Again not very insightful. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4757) NetworkClient should log request details at trace level when a request is cancelled because of disconnection
[ https://issues.apache.org/jira/browse/KAFKA-4757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15872390#comment-15872390 ] Onur Karaman commented on KAFKA-4757: - Ah. Might've found it: Here's what ApiVersionsRequest schema looks like in org.apache.kafka.common.protocol.Protocol: {code} public static final Schema API_VERSIONS_REQUEST_V0 = new Schema(); {code} > NetworkClient should log request details at trace level when a request is > cancelled because of disconnection > > > Key: KAFKA-4757 > URL: https://issues.apache.org/jira/browse/KAFKA-4757 > Project: Kafka > Issue Type: Bug >Reporter: Onur Karaman >Assignee: Colin P. McCabe > > Two issues here: > 1. Here's what NetworkClient now shows when processing a disconnection: > {code} > [2017-02-10 10:48:57,052] TRACE Cancelled request > org.apache.kafka.clients.NetworkClient$InFlightRequest@52f759d7 due to node 0 > being disconnected (org.apache.kafka.clients.NetworkClient) > {code} > The log at one point was useful and actually showed the contents of the > request. For instance, with FetchRequest, you used to be able to see which > partitions were requested as well as the offsets and max bytes requested per > partition. > It looks like InFlightRequest itself doesn't actually hold the request but > instead currently just holds the RequestHeader. We probably want to make > InFlightRequest hold the entire request to make the original request show up > in the logs. > 2. Sometimes I see the following log: > {code} > [2017-02-10 10:53:59,015] TRACE Sending {} to node 0. > (org.apache.kafka.clients.NetworkClient) > {code} > Again not very insightful. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4757) NetworkClient should log request details at trace level when a request is cancelled because of disconnection
[ https://issues.apache.org/jira/browse/KAFKA-4757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15872284#comment-15872284 ] Onur Karaman commented on KAFKA-4757: - [~cmccabe] I double checked and the 2nd logging issue still appears after KAFKA-4635. My trunk is currently at git hash 2c91b324d4798138dd479f54269bdc4f39339817. My initial guess is that it's related to ApiVersionsRequest, as the line following the bad log line usually contains a completed receive of api_key=18: {code} [2017-02-17 10:38:40,312] TRACE Sending {} to node 1. (org.apache.kafka.clients.NetworkClient) [2017-02-17 10:38:40,312] TRACE Completed receive from node 1, for key 18, received {error_code=0,api_versions=[{api_key=0,min_version=0,max_version=2},{api_key=1,min_version=0,max_version=3},{api_key=2,min_version=0,max_version=1},{api_key=3,min_version=0,max_version=2},{api_key=4,min_version=0,max_version=0},{api_key=5,min_version=0,max_version=0},{api_key=6,min_version=0,max_version=3},{api_key=7,min_version=1,max_version=1},{api_key=8,min_version=0,max_version=2},{api_key=9,min_version=0,max_version=2},{api_key=10,min_version=0,max_version=0},{api_key=11,min_version=0,max_version=1},{api_key=12,min_version=0,max_version=0},{api_key=13,min_version=0,max_version=0},{api_key=14,min_version=0,max_version=0},{api_key=15,min_version=0,max_version=0},{api_key=16,min_version=0,max_version=0},{api_key=17,min_version=0,max_version=0},{api_key=18,min_version=0,max_version=0},{api_key=19,min_version=0,max_version=1},{api_key=20,min_version=0,max_version=0}]} (org.apache.kafka.clients.NetworkClient) {code} > NetworkClient should log request details at trace level when a request is > cancelled because of disconnection > > > Key: KAFKA-4757 > URL: https://issues.apache.org/jira/browse/KAFKA-4757 > Project: Kafka > Issue Type: Bug >Reporter: Onur Karaman >Assignee: Colin P. McCabe > > Two issues here: > 1. Here's what NetworkClient now shows when processing a disconnection: > {code} > [2017-02-10 10:48:57,052] TRACE Cancelled request > org.apache.kafka.clients.NetworkClient$InFlightRequest@52f759d7 due to node 0 > being disconnected (org.apache.kafka.clients.NetworkClient) > {code} > The log at one point was useful and actually showed the contents of the > request. For instance, with FetchRequest, you used to be able to see which > partitions were requested as well as the offsets and max bytes requested per > partition. > It looks like InFlightRequest itself doesn't actually hold the request but > instead currently just holds the RequestHeader. We probably want to make > InFlightRequest hold the entire request to make the original request show up > in the logs. > 2. Sometimes I see the following log: > {code} > [2017-02-10 10:53:59,015] TRACE Sending {} to node 0. > (org.apache.kafka.clients.NetworkClient) > {code} > Again not very insightful. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4676) Kafka consumers gets stuck for some partitions
[ https://issues.apache.org/jira/browse/KAFKA-4676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15872213#comment-15872213 ] Onur Karaman commented on KAFKA-4676: - This ticket never really found a root cause so it might not be the same problem as the starvation problem mentioned in KAFKA-4753 but it'd be convenient to link them together nonetheless. > Kafka consumers gets stuck for some partitions > -- > > Key: KAFKA-4676 > URL: https://issues.apache.org/jira/browse/KAFKA-4676 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.1.0 >Reporter: Vishal Shukla >Priority: Critical > Labels: consumer, reliability > Attachments: restart-node2-consumer-node-1.log, > restart-node2-consumer-node-2.log, stuck-case2.log, > stuck-consumer-node-1.log, stuck-consumer-node-2.log, > stuck-topic-thread-dump.log > > > We recently upgraded to Kafka 0.10.1.0. We are frequently facing issue that > Kafka consumers get stuck suddenly for some partitions. > Attached thread dump. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4676) Kafka consumers gets stuck for some partitions
[ https://issues.apache.org/jira/browse/KAFKA-4676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15872199#comment-15872199 ] Onur Karaman commented on KAFKA-4676: - [~Narendra Kumar] if you're repeatedly seeing the request timeout coming from the same subset of brokers, you might be hitting the issue I described in KAFKA-4753. I mention a few tricks to mitigate the issue in the ticket. I think I have a few solutions to the problem but they're more involved. > Kafka consumers gets stuck for some partitions > -- > > Key: KAFKA-4676 > URL: https://issues.apache.org/jira/browse/KAFKA-4676 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.1.0 >Reporter: Vishal Shukla >Priority: Critical > Labels: consumer, reliability > Attachments: restart-node2-consumer-node-1.log, > restart-node2-consumer-node-2.log, stuck-case2.log, > stuck-consumer-node-1.log, stuck-consumer-node-2.log, > stuck-topic-thread-dump.log > > > We recently upgraded to Kafka 0.10.1.0. We are frequently facing issue that > Kafka consumers get stuck suddenly for some partitions. > Attached thread dump. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4757) NetworkClient should log request details at trace level when a request is cancelled because of disconnection
[ https://issues.apache.org/jira/browse/KAFKA-4757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15872194#comment-15872194 ] Onur Karaman commented on KAFKA-4757: - Thanks [~cmccabe]. Regarding the second issue, I'll need to double check if I already pulled in the patch. > NetworkClient should log request details at trace level when a request is > cancelled because of disconnection > > > Key: KAFKA-4757 > URL: https://issues.apache.org/jira/browse/KAFKA-4757 > Project: Kafka > Issue Type: Bug >Reporter: Onur Karaman >Assignee: Colin P. McCabe > > Two issues here: > 1. Here's what NetworkClient now shows when processing a disconnection: > {code} > [2017-02-10 10:48:57,052] TRACE Cancelled request > org.apache.kafka.clients.NetworkClient$InFlightRequest@52f759d7 due to node 0 > being disconnected (org.apache.kafka.clients.NetworkClient) > {code} > The log at one point was useful and actually showed the contents of the > request. For instance, with FetchRequest, you used to be able to see which > partitions were requested as well as the offsets and max bytes requested per > partition. > It looks like InFlightRequest itself doesn't actually hold the request but > instead currently just holds the RequestHeader. We probably want to make > InFlightRequest hold the entire request to make the original request show up > in the logs. > 2. Sometimes I see the following log: > {code} > [2017-02-10 10:53:59,015] TRACE Sending {} to node 0. > (org.apache.kafka.clients.NetworkClient) > {code} > Again not very insightful. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-4757) NetworkClient trace level logging regressions
Onur Karaman created KAFKA-4757: --- Summary: NetworkClient trace level logging regressions Key: KAFKA-4757 URL: https://issues.apache.org/jira/browse/KAFKA-4757 Project: Kafka Issue Type: Bug Reporter: Onur Karaman Two issues here: 1. Here's what NetworkClient now shows when processing a disconnection: {code} [2017-02-10 10:48:57,052] TRACE Cancelled request org.apache.kafka.clients.NetworkClient$InFlightRequest@52f759d7 due to node 0 being disconnected (org.apache.kafka.clients.NetworkClient) {code} The log at one point was useful and actually showed the contents of the request. For instance, with FetchRequest, you used to be able to see which partitions were requested as well as the offsets and max bytes requested per partition. It looks like InFlightRequest itself doesn't actually hold the request but instead currently just holds the RequestHeader. We probably want to make InFlightRequest hold the entire request to make the original request show up in the logs. 2. Sometimes I see the following log: {code} [2017-02-10 10:53:59,015] TRACE Sending {} to node 0. (org.apache.kafka.clients.NetworkClient) {code} Again not very insightful. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4753) KafkaConsumer susceptible to FetchResponse starvation
[ https://issues.apache.org/jira/browse/KAFKA-4753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15860872#comment-15860872 ] Onur Karaman commented on KAFKA-4753: - It might mitigate users whose consumers participate in group coordination to some extent but wouldn't help for manual partition assignments since the assignor would never get involved. It's also not a solution to the problem.,Users can still experience the other scenarios. We even give users the API to seek to wherever they'd like, indicating that doing so should just work. > KafkaConsumer susceptible to FetchResponse starvation > - > > Key: KAFKA-4753 > URL: https://issues.apache.org/jira/browse/KAFKA-4753 > Project: Kafka > Issue Type: Bug >Reporter: Onur Karaman >Assignee: Onur Karaman > > FetchResponse starvation here means that the KafkaConsumer repeatedly fails > to fully form FetchResponses within the request timeout from a subset of the > brokers its fetching from while FetchResponses from the other brokers can get > fully formed and processed by the application. > In other words, this ticket is concerned with scenarios where fetching from > some brokers hurts the progress of fetching from other brokers to the point > of repeatedly hitting a request timeout. > Some FetchResponse starvation scenarios: > 1. partition leadership of the consumer's assigned partitions is skewed > across brokers, causing uneven FetchResponse sizes across brokers. > 2. the consumer seeks back on partitions on some brokers but not others, > causing uneven FetchResponse sizes across brokers. > 3. the consumer's ability to keep up with various partitions across brokers > is skewed, causing uneven FetchResponse sizes across brokers. > I've personally seen scenario 1 happen this past week to one of our users in > prod. They manually assigned partitions such that a few brokers led most of > the partitions while other brokers only led a single partition. When > NetworkClient sends out FetchRequests to different brokers in parallel with > an uneven partition distribution, FetchResponses from brokers who lead more > partitions will contain more data than FetchResponses from brokers who lead > few partitions. This means the small FetchResponses will get fully formed > quicker than larger FetchResponses. When the application eventually consumes > a smaller fully formed FetchResponses, the NetworkClient will send out a new > FetchRequest to the lightly-loaded broker. Their response will again come > back quickly while only marginal progress has been made on the larger > FetchResponse. Repeat this process several times and your application will > have potentially processed many smaller FetchResponses while the larger > FetchResponse made minimal progress and is forced to timeout, causing the > large FetchResponse to start all over again, which causes starvation. > To mitigate the problem for the short term, I've suggested to our user that > they either: > 1. bump up their "receive.buffer.bytes" beyond the current default of 64 KB > to something like 1 MB. This is the solution I short-term solution I > suggested they go with. > 2. reduce the "max.partition.fetch.bytes" down from the current default of 1 > MB to something like 100 KB. This solution wasn't advised as it could impact > broker performance. > 3. ask our SRE's to run a partition reassignment to balance out the partition > leadership (partitions were already being led by their preferred leaders). > 4. bump up their request timeout. It was set to open-source's former default > of 40 seconds. > Contributing factors: > 1. uneven FetchResponse sizes across brokers. > 2. processing time of the polled ConsumerRecords. > 3. "max.poll.records" increases the number of polls needed to consume a > FetchResponse, making constant-time overhead per poll magnified. > 4. "max.poll.records" makes KafkaConsumer.poll bypass calls to > ConsumerNetworkClient.poll. > 5. java.nio.channels.Selector.select, Selector.poll, NetworkClient.poll, and > ConsumerNetworkClient.poll can return before the poll timeout as soon as a > single channel is selected. > 6. NetworkClient.poll is solely driven by the user thread with manual > partition assignment. > So far I've only locally reproduced starvation scenario 1 and haven't even > attempted the other scenarios. Preventing the bypass of > ConsumerNetworkClient.poll (contributing factor 3) mitigates the issue, but > it seems starvation would still be possible. > How to reproduce starvation scenario 1: > 1. startup zookeeper > 2. startup two brokers > 3. create a topic t0 with two partitions led by broker 0 and create a topic > t1 with a single partition led by broker 1 > {code} > > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t0 > >
[jira] [Commented] (KAFKA-4753) KafkaConsumer susceptible to FetchResponse starvation
[ https://issues.apache.org/jira/browse/KAFKA-4753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15860366#comment-15860366 ] Onur Karaman commented on KAFKA-4753: - One way to mitigate but not fully solve the starvation problem is to make sure that KafkaConsumer.poll always triggers the java.nio.channels.Selector. For instance, we can get rid of the bypass in pollOnce: {code} -// if data is available already, return it immediately -Map>> records = fetcher.fetchedRecords(); -if (!records.isEmpty()) -return records; - {code} Alternatively, we can add some logic in KafkaConsumer.poll: {code} Map >> records = pollOnce(remaining); if (!records.isEmpty()) { // before returning the fetched records, we can send off the next round of fetches // and avoid block waiting for their responses to enable pipelining while the user // is handling the fetched records. // // NOTE: since the consumed position has already been updated, we must not allow // wakeups or any other errors to be triggered prior to returning the fetched records. - if (fetcher.sendFetches() > 0) { + if (fetcher.sendFetches() > 0 || client.pendingRequestCount() > 0) { client.pollNoWakeup(); } if (this.interceptors == null) return new ConsumerRecords<>(records); else return this.interceptors.onConsume(new ConsumerRecords<>(records)); } {code} I'm not crazy about these solutions as it only mitigates the problem and doesn't solve it. > KafkaConsumer susceptible to FetchResponse starvation > - > > Key: KAFKA-4753 > URL: https://issues.apache.org/jira/browse/KAFKA-4753 > Project: Kafka > Issue Type: Bug >Reporter: Onur Karaman >Assignee: Onur Karaman > > FetchResponse starvation here means that the KafkaConsumer repeatedly fails > to fully form FetchResponses within the request timeout from a subset of the > brokers its fetching from while FetchResponses from the other brokers can get > fully formed and processed by the application. > In other words, this ticket is concerned with scenarios where fetching from > some brokers hurts the progress of fetching from other brokers to the point > of repeatedly hitting a request timeout. > Some FetchResponse starvation scenarios: > 1. partition leadership of the consumer's assigned partitions is skewed > across brokers, causing uneven FetchResponse sizes across brokers. > 2. the consumer seeks back on partitions on some brokers but not others, > causing uneven FetchResponse sizes across brokers. > 3. the consumer's ability to keep up with various partitions across brokers > is skewed, causing uneven FetchResponse sizes across brokers. > I've personally seen scenario 1 happen this past week to one of our users in > prod. They manually assigned partitions such that a few brokers led most of > the partitions while other brokers only led a single partition. When > NetworkClient sends out FetchRequests to different brokers in parallel with > an uneven partition distribution, FetchResponses from brokers who lead more > partitions will contain more data than FetchResponses from brokers who lead > few partitions. This means the small FetchResponses will get fully formed > quicker than larger FetchResponses. When the application eventually consumes > a smaller fully formed FetchResponses, the NetworkClient will send out a new > FetchRequest to the lightly-loaded broker. Their response will again come > back quickly while only marginal progress has been made on the larger > FetchResponse. Repeat this process several times and your application will > have potentially processed many smaller FetchResponses while the larger > FetchResponse made minimal progress and is forced to timeout, causing the > large FetchResponse to start all over again, which causes starvation. > To mitigate the problem for the short term, I've suggested to our user that > they either: > 1. bump up their "receive.buffer.bytes" beyond the current default of 64 KB > to something like 1 MB. This is the solution I short-term solution I > suggested they go with. > 2. reduce the "max.partition.fetch.bytes" down from the current default of 1 > MB to something like 100 KB. This solution wasn't advised as it could impact > broker performance. > 3. ask our SRE's to run a partition reassignment to balance out the partition > leadership (partitions were already being led by their
[jira] [Created] (KAFKA-4753) KafkaConsumer susceptible to FetchResponse starvation
Onur Karaman created KAFKA-4753: --- Summary: KafkaConsumer susceptible to FetchResponse starvation Key: KAFKA-4753 URL: https://issues.apache.org/jira/browse/KAFKA-4753 Project: Kafka Issue Type: Bug Reporter: Onur Karaman Assignee: Onur Karaman FetchResponse starvation here means that the KafkaConsumer repeatedly fails to fully form FetchResponses within the request timeout from a subset of the brokers its fetching from while FetchResponses from the other brokers can get fully formed and processed by the application. In other words, this ticket is concerned with scenarios where fetching from some brokers hurts the progress of fetching from other brokers to the point of repeatedly hitting a request timeout. Some FetchResponse starvation scenarios: 1. partition leadership of the consumer's assigned partitions is skewed across brokers, causing uneven FetchResponse sizes across brokers. 2. the consumer seeks back on partitions on some brokers but not others, causing uneven FetchResponse sizes across brokers. 3. the consumer's ability to keep up with various partitions across brokers is skewed, causing uneven FetchResponse sizes across brokers. I've personally seen scenario 1 happen this past week to one of our users in prod. They manually assigned partitions such that a few brokers led most of the partitions while other brokers only led a single partition. When NetworkClient sends out FetchRequests to different brokers in parallel with an uneven partition distribution, FetchResponses from brokers who lead more partitions will contain more data than FetchResponses from brokers who lead few partitions. This means the small FetchResponses will get fully formed quicker than larger FetchResponses. When the application eventually consumes a smaller fully formed FetchResponses, the NetworkClient will send out a new FetchRequest to the lightly-loaded broker. Their response will again come back quickly while only marginal progress has been made on the larger FetchResponse. Repeat this process several times and your application will have potentially processed many smaller FetchResponses while the larger FetchResponse made minimal progress and is forced to timeout, causing the large FetchResponse to start all over again, which causes starvation. To mitigate the problem for the short term, I've suggested to our user that they either: 1. bump up their "receive.buffer.bytes" beyond the current default of 64 KB to something like 1 MB. This is the solution I short-term solution I suggested they go with. 2. reduce the "max.partition.fetch.bytes" down from the current default of 1 MB to something like 100 KB. This solution wasn't advised as it could impact broker performance. 3. ask our SRE's to run a partition reassignment to balance out the partition leadership (partitions were already being led by their preferred leaders). 4. bump up their request timeout. It was set to open-source's former default of 40 seconds. Contributing factors: 1. uneven FetchResponse sizes across brokers. 2. processing time of the polled ConsumerRecords. 3. "max.poll.records" increases the number of polls needed to consume a FetchResponse, making constant-time overhead per poll magnified. 4. "max.poll.records" makes KafkaConsumer.poll bypass calls to ConsumerNetworkClient.poll. 5. java.nio.channels.Selector.select, Selector.poll, NetworkClient.poll, and ConsumerNetworkClient.poll can return before the poll timeout as soon as a single channel is selected. 6. NetworkClient.poll is solely driven by the user thread with manual partition assignment. So far I've only locally reproduced starvation scenario 1 and haven't even attempted the other scenarios. Preventing the bypass of ConsumerNetworkClient.poll (contributing factor 3) mitigates the issue, but it seems starvation would still be possible. How to reproduce starvation scenario 1: 1. startup zookeeper 2. startup two brokers 3. create a topic t0 with two partitions led by broker 0 and create a topic t1 with a single partition led by broker 1 {code} > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t0 > --replica-assignment 0,0 > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t1 > --replica-assignment 1 {code} 4. Produce a lot of data into these topics {code} > ./bin/kafka-producer-perf-test.sh --topic t0 --num-records 2000 > --record-size 100 --throughput 10 --producer-props > bootstrap.servers=localhost:9090,localhost:9091 > ./bin/kafka-producer-perf-test.sh --topic t1 --num-records 1000 > --record-size 100 --throughput 10 --producer-props > bootstrap.servers=localhost:9090,localhost:9091 {code} 5. startup a consumer that consumes these 3 partitions with TRACE level NetworkClient logging {code} > ./bin/kafka-run-class.sh >
[jira] [Created] (KAFKA-4749) fix join-time-max and sync-time-max MeasurableStat type
Onur Karaman created KAFKA-4749: --- Summary: fix join-time-max and sync-time-max MeasurableStat type Key: KAFKA-4749 URL: https://issues.apache.org/jira/browse/KAFKA-4749 Project: Kafka Issue Type: Bug Reporter: Onur Karaman Assignee: Onur Karaman GroupCoordinatorMetrics currently sets up join-time-max and sync-time-max incorrectly as a "new Avg()" MeasurableStat instead of "new Max()" -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-4747) add metrics for KafkaConsumer.poll
Onur Karaman created KAFKA-4747: --- Summary: add metrics for KafkaConsumer.poll Key: KAFKA-4747 URL: https://issues.apache.org/jira/browse/KAFKA-4747 Project: Kafka Issue Type: Improvement Reporter: Onur Karaman Assignee: Onur Karaman KafkaConsumer heavily depends on KafkaConsumer.poll yet we don't have metrics directly associated with it. We probably want to add two metrics: 1. time spent in KafkaConsumer.poll 2. time since last KafkaConsumer.poll (measured as now - endTimeOfLastPoll) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (KAFKA-4513) Support migration of old consumers to new consumers without downtime
[ https://issues.apache.org/jira/browse/KAFKA-4513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman reassigned KAFKA-4513: --- Assignee: Onur Karaman > Support migration of old consumers to new consumers without downtime > > > Key: KAFKA-4513 > URL: https://issues.apache.org/jira/browse/KAFKA-4513 > Project: Kafka > Issue Type: New Feature >Reporter: Ismael Juma >Assignee: Onur Karaman > > Some ideas were discussed in the following thread: > http://markmail.org/message/ovngfw3ibixlquxh -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4704) Group coordinator cache loading fails if groupId is used first for consumer groups and then for simple consumer
[ https://issues.apache.org/jira/browse/KAFKA-4704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15840663#comment-15840663 ] Onur Karaman commented on KAFKA-4704: - It might be worth mentioning that I hit a similar scenario when implementing the new consumer migration. Rolling back from the migration-aware old consumer (or just new consumer) to the old consumer with kafka-based offset storage (or {{dual.commit.enabled}}) causes the old consumer offset commits to fail silently. This is because by that point, the group has been added to the {{GroupCoordinator}} and generation id has been incremented to be >= 0. The old consumer, on the other hand, is naively sending {{OffsetCommitRequests}} with an empty member id and generation id of -1, so the {{GroupCoordinator}} will reject the request with {{UNKNOWN_MEMBER_ID}}. I have a workaround constraint to address the problem, but I'll leave that for when I send out the actual proposal. > Group coordinator cache loading fails if groupId is used first for consumer > groups and then for simple consumer > --- > > Key: KAFKA-4704 > URL: https://issues.apache.org/jira/browse/KAFKA-4704 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1 >Reporter: Jason Gustafson >Assignee: Jason Gustafson > Fix For: 0.10.2.0 > > > When all the members in a consumer group have died and all of its offsets > have expired, we write a tombstone to __consumer_offsets so that its group > metadata is cleaned up. It is possible that after this happens, the same > groupId is then used only for offset storage (i.e. by "simple" consumers). > Our current cache loading logic, which is triggered when a coordinator first > takes over control of a partition, does not account for this scenario and > would currently fail. > This is probably an unlikely scenario to hit in practice, but it reveals the > lack of test coverage around the cache loading logic. We should improve this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3959) __consumer_offsets wrong number of replicas at startup
[ https://issues.apache.org/jira/browse/KAFKA-3959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman updated KAFKA-3959: Fix Version/s: (was: 0.10.3.0) 0.10.2.0 > __consumer_offsets wrong number of replicas at startup > -- > > Key: KAFKA-3959 > URL: https://issues.apache.org/jira/browse/KAFKA-3959 > Project: Kafka > Issue Type: Bug > Components: consumer, offset manager, replication >Affects Versions: 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.0.2, 0.10.1.0, > 0.10.1.1, 0.10.1.2 > Environment: Brokers of 3 kafka nodes running Red Hat Enterprise > Linux Server release 7.2 (Maipo) >Reporter: Alban Hurtaud >Assignee: Onur Karaman >Priority: Blocker > Labels: needs-kip, reliability > Fix For: 0.10.2.0 > > > When creating a stack of 3 kafka brokers, the consumer is starting faster > than kafka nodes and when trying to read a topic, only one kafka node is > available. > So the __consumer_offsets is created with a replication factor set to 1 > (instead of configured 3) : > offsets.topic.replication.factor=3 > default.replication.factor=3 > min.insync.replicas=2 > Then, other kafka nodes go up and we have exceptions because the replicas # > for __consumer_offsets is 1 and min insync is 2. So exceptions are thrown. > What I missed is : Why the __consumer_offsets is created with replication to > 1 (when 1 broker is running) whereas in server.properties it is set to 3 ? > To reproduce : > - Prepare 3 kafka nodes with the 3 lines above added to servers.properties. > - Run one kafka, > - Run one consumer (the __consumer_offsets is created with replicas =1) > - Run 2 more kafka nodes -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4576) Log segments close to max size break on fetch
[ https://issues.apache.org/jira/browse/KAFKA-4576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15794567#comment-15794567 ] Onur Karaman commented on KAFKA-4576: - I think you're right, [~huxi_2b]. It looks like we incorrectly assume "FileChannel.read" does the full read all throughout the FileMessageSet class. > Log segments close to max size break on fetch > - > > Key: KAFKA-4576 > URL: https://issues.apache.org/jira/browse/KAFKA-4576 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.10.1.1 >Reporter: Ivan Babrou > > We are running Kafka 0.10.1.1~rc1 (it's the same as 0.10.1.1). > Max segment size is set to 2147483647 globally, that's 1 byte less than max > signed int32. > Every now and then we see failures like this: > {noformat} > Dec 25 18:20:47 myhost kafka-run-class.sh[2054]: ERROR [Replica Manager on > Broker 1006]: Error processing fetch operation on partition [mytopic,11], > offset 483579108587 (kafka.server.ReplicaManager) > Dec 25 18:20:47 myhost kafka-run-class.sh[2054]: > java.lang.IllegalStateException: Failed to read complete buffer for > targetOffset 483686627237 startPosition 2145701130 in > /disk/data0/kafka-logs/mytopic-11/483571890786.log > Dec 25 18:20:47 myhost kafka-run-class.sh[2054]: at > kafka.log.FileMessageSet.searchForOffsetWithSize(FileMessageSet.scala:145) > Dec 25 18:20:47 myhost kafka-run-class.sh[2054]: at > kafka.log.LogSegment.translateOffset(LogSegment.scala:128) > Dec 25 18:20:47 myhost kafka-run-class.sh[2054]: at > kafka.log.LogSegment.read(LogSegment.scala:180) > Dec 25 18:20:47 myhost kafka-run-class.sh[2054]: at > kafka.log.Log.read(Log.scala:563) > Dec 25 18:20:47 myhost kafka-run-class.sh[2054]: at > kafka.server.ReplicaManager.kafka$server$ReplicaManager$$read$1(ReplicaManager.scala:567) > Dec 25 18:20:47 myhost kafka-run-class.sh[2054]: at > kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:606) > Dec 25 18:20:47 myhost kafka-run-class.sh[2054]: at > kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:605) > Dec 25 18:20:47 myhost kafka-run-class.sh[2054]: at > scala.collection.Iterator$class.foreach(Iterator.scala:893) > Dec 25 18:20:47 myhost kafka-run-class.sh[2054]: at > scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > Dec 25 18:20:47 myhost kafka-run-class.sh[2054]: at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > Dec 25 18:20:47 myhost kafka-run-class.sh[2054]: at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) > Dec 25 18:20:47 myhost kafka-run-class.sh[2054]: at > kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:605) > Dec 25 18:20:47 myhost kafka-run-class.sh[2054]: at > kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:469) > Dec 25 18:20:47 myhost kafka-run-class.sh[2054]: at > kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:534) > Dec 25 18:20:47 myhost kafka-run-class.sh[2054]: at > kafka.server.KafkaApis.handle(KafkaApis.scala:79) > Dec 25 18:20:47 myhost kafka-run-class.sh[2054]: at > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) > Dec 25 18:20:47 myhost kafka-run-class.sh[2054]: at > java.lang.Thread.run(Thread.java:745) > {noformat} > {noformat} > ... > -rw-r--r-- 1 kafka kafka 0 Dec 25 15:15 > 483557418204.timeindex > -rw-r--r-- 1 kafka kafka 9496 Dec 25 15:26 483564654488.index > -rw-r--r-- 1 kafka kafka 2145763964 Dec 25 15:26 483564654488.log > -rw-r--r-- 1 kafka kafka 0 Dec 25 15:26 > 483564654488.timeindex > -rw-r--r-- 1 kafka kafka 9576 Dec 25 15:37 483571890786.index > -rw-r--r-- 1 kafka kafka 2147483644 Dec 25 15:37 483571890786.log > -rw-r--r-- 1 kafka kafka 0 Dec 25 15:37 > 483571890786.timeindex > -rw-r--r-- 1 kafka kafka 9568 Dec 25 15:48 483579135712.index > -rw-r--r-- 1 kafka kafka 2146791360 Dec 25 15:48 483579135712.log > -rw-r--r-- 1 kafka kafka 0 Dec 25 15:48 > 483579135712.timeindex > -rw-r--r-- 1 kafka kafka 9408 Dec 25 15:59 483586374164.index > ... > {noformat} > Here 483571890786.log is just 3 bytes below the max size. -- This message was sent by Atlassian JIRA (v6.3.4#6332)