[jira] [Created] (KAFKA-6082) consider fencing zookeeper updates with controller epoch zkVersion

2017-10-18 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-6082:
---

 Summary: consider fencing zookeeper updates with controller epoch 
zkVersion
 Key: KAFKA-6082
 URL: https://issues.apache.org/jira/browse/KAFKA-6082
 Project: Kafka
  Issue Type: Sub-task
Reporter: Onur Karaman


If we want, we can use multi-op to fence zookeeper updates with the controller 
epoch's zkVersion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6081) response error code checking

2017-10-18 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-6081:
---

 Summary: response error code checking
 Key: KAFKA-6081
 URL: https://issues.apache.org/jira/browse/KAFKA-6081
 Project: Kafka
  Issue Type: Sub-task
Reporter: Onur Karaman


In most cases in the controller, we assume that requests succeed. We should 
instead check for their responses.

Example: partition reassignment has the following todo:
{code}
// TODO: Eventually partition reassignment could use a callback that does 
retries if deletion failed
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5083) always leave the last surviving member of the ISR in ZK

2017-10-18 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman resolved KAFKA-5083.
-
Resolution: Fixed

This has been fixed in KAFKA-5642.

> always leave the last surviving member of the ISR in ZK
> ---
>
> Key: KAFKA-5083
> URL: https://issues.apache.org/jira/browse/KAFKA-5083
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> Currently we erase ISR membership if the replica to be removed from the ISR 
> is the last surviving member of the ISR and unclean leader election is 
> enabled for the corresponding topic.
> We should investigate leaving the last replica in ISR in ZK, independent of 
> whether unclean leader election is enabled or not. That way, if people 
> re-disabled unclean leader election, we can still try to elect the leader 
> from the last in-sync replica.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6065) Add zookeeper metrics to ZookeeperClient as in KIP-188

2017-10-16 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-6065:
---

 Summary: Add zookeeper metrics to ZookeeperClient as in KIP-188
 Key: KAFKA-6065
 URL: https://issues.apache.org/jira/browse/KAFKA-6065
 Project: Kafka
  Issue Type: Sub-task
Reporter: Onur Karaman


Among other things, KIP-188 added latency metrics to ZkUtils. We should add the 
same metrics to ZookeeperClient.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6014) new consumer mirror maker halts after committing offsets to a deleted topic

2017-10-04 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-6014:
---

 Summary: new consumer mirror maker halts after committing offsets 
to a deleted topic
 Key: KAFKA-6014
 URL: https://issues.apache.org/jira/browse/KAFKA-6014
 Project: Kafka
  Issue Type: Bug
Reporter: Onur Karaman


New consumer throws an unexpected KafkaException when trying to commit to a 
topic that has been deleted. MirrorMaker.commitOffsets doesn't attempt to catch 
the KafkaException and just kills the process. We didn't see this in the old 
consumer because old consumer just silently drops failed offset commits.

I ran a quick experiment locally to prove the behavior. The experiment:
1. start up a single broker
2. create a single-partition topic t
3. create a new consumer that consumes topic t
4. make the consumer commit every few seconds
5. delete topic t
6. expect: KafkaException that kills the process.

Here's my script:
{code}
package org.apache.kafka.clients.consumer;

import org.apache.kafka.common.TopicPartition;

import java.util.Collections;
import java.util.List;
import java.util.Properties;

public class OffsetCommitTopicDeletionTest {
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9090");
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "g");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer kafkaConsumer = new 
KafkaConsumer<>(props);
TopicPartition partition = new TopicPartition("t", 0);
List partitions = Collections.singletonList(partition);
kafkaConsumer.assign(partitions);
while (true) {
kafkaConsumer.commitSync(Collections.singletonMap(partition, new 
OffsetAndMetadata(0, "")));
Thread.sleep(1000);
}
}
}
{code}

Here are the other commands:
{code}
> rm -rf /tmp/zookeeper/ /tmp/kafka-logs* logs*
> ./gradlew clean jar
> ./bin/zookeeper-server-start.sh config/zookeeper.properties
> export LOG_DIR=logs0 && ./bin/kafka-server-start.sh config/server0.properties
> ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t 
> --partitions 1 --replication-factor 1
> ./bin/kafka-run-class.sh 
> org.apache.kafka.clients.consumer.OffsetCommitTopicDeletionTest
> ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic t
{code}

Here is the output:
{code}
[2017-10-04 20:00:14,451] ERROR [Consumer clientId=consumer-1, groupId=g] 
Offset commit failed on partition t-0 at offset 0: This server does not host 
this topic-partition. 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
Exception in thread "main" org.apache.kafka.common.KafkaException: Partition 
t-0 may not exist or user may not have Describe access to topic
  at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:789)
  at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:734)
  at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808)
  at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788)
  at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
  at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
  at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
  at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:506)
  at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)
  at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:268)
  at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
  at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:190)
  at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:600)
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1231)
  at 
org.apache.kafka.clients.consumer.OffsetCommitTopicDeletionTest.main(OffsetCommitTopicDeletionTest.java:22)

[jira] [Created] (KAFKA-5894) add the notion of max inflight requests to async ZookeeperClient

2017-09-14 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-5894:
---

 Summary: add the notion of max inflight requests to async 
ZookeeperClient
 Key: KAFKA-5894
 URL: https://issues.apache.org/jira/browse/KAFKA-5894
 Project: Kafka
  Issue Type: Sub-task
Reporter: Onur Karaman
Assignee: Onur Karaman


ZookeeperClient is a zookeeper client that encourages pipelined requests to 
zookeeper. We want to add the notion of max inflight requests to the client for 
several reasons:
# to bound memory overhead associated with async requests on the client.
# to not overwhelm the zookeeper ensemble with a burst of requests.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-4747) add metrics for KafkaConsumer.poll

2017-09-11 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman resolved KAFKA-4747.
-
Resolution: Won't Fix

[~junrao] pointed out that the distinction between tim-in-poll and 
time-in-application can be effectively computed as 1 - (io-ratio) - 
(io-wait-ratio). If this value is close to 1, then time is mostly being spent 
on the application-side. Otherwise if this value is close to 0, then time is 
mostly being spent on the client-side.

Here's a simple experiment I ran to verify:
{code}
/**
 * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
 * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
 * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
 * License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 */
package org.apache.kafka.clients.consumer;

import org.apache.kafka.common.TopicPartition;

import java.util.Collections;
import java.util.Properties;

public class SlowKafkaConsumer {
public static void main(String[] args) throws InterruptedException {
long pollTimeout = Long.valueOf(args[0]);
long sleepDuration = Long.valueOf(args[1]);
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9090");
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "onur");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
KafkaConsumer kafkaConsumer = new 
KafkaConsumer<>(props);
kafkaConsumer.assign(Collections.singletonList(new TopicPartition("t", 
0)));
kafkaConsumer.seekToBeginning(Collections.singletonList(new 
TopicPartition("t", 0)));
while (true) {
kafkaConsumer.poll(pollTimeout);
Thread.sleep(sleepDuration);
}
}
}
{code}

{code}
no data
===
> ./bin/kafka-run-class.sh org.apache.kafka.clients.consumer.SlowKafkaConsumer 
> 2000 0
io-ratio ~ 0
io-wait-ratio ~ 0.99

> ./bin/kafka-run-class.sh org.apache.kafka.clients.consumer.SlowKafkaConsumer 
> 2000 1
io-ratio ~ 0
io-wait-ratio ~ [0.1, 0.2]

> ./bin/kafka-run-class.sh org.apache.kafka.clients.consumer.SlowKafkaConsumer 
> 2000 2
io-ratio ~ 0
io-wait-ratio ~ [0.05, 0.12]

with data
=
> ./bin/kafka-producer-perf-test.sh --producer-props 
> bootstrap.servers=localhost:9090 --topic t --throughput -1 --num-records 
> 1 --record-size 1000

> ./bin/kafka-run-class.sh org.apache.kafka.clients.consumer.SlowKafkaConsumer 
> 2000 0
io-ratio ~ 0.06
io-wait-ratio ~ 0.8

> ./bin/kafka-run-class.sh org.apache.kafka.clients.consumer.SlowKafkaConsumer 
> 2000 1
io-ratio ~ 0
io-wait-ratio ~ [0.05, 0.1]

> ./bin/kafka-run-class.sh org.apache.kafka.clients.consumer.SlowKafkaConsumer 
> 2000 2
io-ratio ~ 0
io-wait-ratio ~ [0, 0.03]
{code}

> add metrics for KafkaConsumer.poll
> --
>
> Key: KAFKA-4747
> URL: https://issues.apache.org/jira/browse/KAFKA-4747
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> KafkaConsumer heavily depends on KafkaConsumer.poll yet we don't have metrics 
> directly associated with it.
> We probably want to add two metrics:
> 1. time spent in KafkaConsumer.poll
> 2. time since last KafkaConsumer.poll (measured as now - endTimeOfLastPoll)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5703) allow debug-level logging for RequestChannel's request logger

2017-08-04 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman resolved KAFKA-5703.
-
Resolution: Fixed

Woops. Looks like [~ijuma] already fixed this 3 days ago:
4086db472d08f6dee4d30dda82ab9ff7b67d1a20

> allow debug-level logging for RequestChannel's request logger
> -
>
> Key: KAFKA-5703
> URL: https://issues.apache.org/jira/browse/KAFKA-5703
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> Git hash d25671884bbbdf7843ada3e7797573a00ac7cd56 introduced a bug in 
> RequestChannel's request logger that causes debug-level logging to never 
> occur.
> {code}
> -  if (requestLogger.isTraceEnabled)
> -requestLogger.trace("Completed request:%s from connection 
> %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d,securityProtocol:%s,principal:%s"
> -  .format(requestDesc(true), connectionId, totalTime, 
> requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, 
> responseSendTime, securityProtocol, session.principal))
> -  else if (requestLogger.isDebugEnabled)
> -requestLogger.debug("Completed request:%s from connection 
> %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d,securityProtocol:%s,principal:%s"
> -  .format(requestDesc(false), connectionId, totalTime, 
> requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, 
> responseSendTime, securityProtocol, session.principal))
> +  if (requestLogger.isDebugEnabled) {
> +val detailsEnabled = requestLogger.isTraceEnabled
> +requestLogger.trace("Completed request:%s from connection 
> %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d,securityProtocol:%s,principal:%s,listener:%s"
> +  .format(requestDesc(detailsEnabled), connectionId, totalTime, 
> requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, 
> responseSendTime, securityProtocol, session.principal, listenerName.value))
> +  }
> {code}
> So trace-level logging is used even if debug-level logging is specified, 
> causing users to not see the non-detailed request logs.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5703) allow debug-level logging for RequestChannel's request logger

2017-08-04 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-5703:
---

 Summary: allow debug-level logging for RequestChannel's request 
logger
 Key: KAFKA-5703
 URL: https://issues.apache.org/jira/browse/KAFKA-5703
 Project: Kafka
  Issue Type: Bug
Reporter: Onur Karaman
Assignee: Onur Karaman


Git hash d25671884bbbdf7843ada3e7797573a00ac7cd56 introduced a bug in 
RequestChannel's request logger that causes debug-level logging to never occur.

{code}
-  if (requestLogger.isTraceEnabled)
-requestLogger.trace("Completed request:%s from connection 
%s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d,securityProtocol:%s,principal:%s"
-  .format(requestDesc(true), connectionId, totalTime, 
requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, 
responseSendTime, securityProtocol, session.principal))
-  else if (requestLogger.isDebugEnabled)
-requestLogger.debug("Completed request:%s from connection 
%s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d,securityProtocol:%s,principal:%s"
-  .format(requestDesc(false), connectionId, totalTime, 
requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, 
responseSendTime, securityProtocol, session.principal))
+  if (requestLogger.isDebugEnabled) {
+val detailsEnabled = requestLogger.isTraceEnabled
+requestLogger.trace("Completed request:%s from connection 
%s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d,securityProtocol:%s,principal:%s,listener:%s"
+  .format(requestDesc(detailsEnabled), connectionId, totalTime, 
requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, 
responseSendTime, securityProtocol, session.principal, listenerName.value))
+  }
{code}

So trace-level logging is used even if debug-level logging is specified, 
causing users to not see the non-detailed request logs.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5501) introduce async ZookeeperClient

2017-07-26 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman resolved KAFKA-5501.
-
Resolution: Fixed

> introduce async ZookeeperClient
> ---
>
> Key: KAFKA-5501
> URL: https://issues.apache.org/jira/browse/KAFKA-5501
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
> Fix For: 1.0.0
>
>
> Synchronous zookeeper apis means that we wait an entire round trip before 
> doing the next operation. We should introduce a zookeeper client that 
> encourages pipelined requests to zookeeper.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5642) use async ZookeeperClient everywhere

2017-07-26 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-5642:
---

 Summary: use async ZookeeperClient everywhere
 Key: KAFKA-5642
 URL: https://issues.apache.org/jira/browse/KAFKA-5642
 Project: Kafka
  Issue Type: Sub-task
Reporter: Onur Karaman
Assignee: Onur Karaman


Synchronous zookeeper writes means that we wait an entire round trip before 
doing the next write. These synchronous writes are happening at a per-partition 
granularity in several places, so partition-heavy clusters suffer from the 
controller doing many sequential round trips to zookeeper.
* PartitionStateMachine.electLeaderForPartition updates leaderAndIsr in 
zookeeper on transition to OnlinePartition. This gets triggered per-partition 
sequentially with synchronous writes during controlled shutdown of the shutting 
down broker's replicas for which it is the leader.
* ReplicaStateMachine updates leaderAndIsr in zookeeper on transition to 
OfflineReplica when calling KafkaController.removeReplicaFromIsr. This gets 
triggered per-partition sequentially with synchronous writes for failed or 
controlled shutdown brokers.

KAFKA-5501 introduced an async ZookeeperClient that encourages pipelined 
requests to zookeeper. We should replace ZkClient's usage with this client.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5502) read current brokers from zookeeper upon processing broker change

2017-06-22 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-5502:
---

 Summary: read current brokers from zookeeper upon processing 
broker change
 Key: KAFKA-5502
 URL: https://issues.apache.org/jira/browse/KAFKA-5502
 Project: Kafka
  Issue Type: Sub-task
Reporter: Onur Karaman
Assignee: Onur Karaman


[~lindong]'s testing of the 0.11.0 release revealed a controller-side 
performance regression in clusters with many brokers and many partitions when 
bringing up many brokers simultaneously.

The regerssion is caused by KAFKA-5028: a Watcher receives WatchedEvent 
notifications from the raw ZooKeeper client EventThread. A WatchedEvent only 
contains the following information:
- KeeperState
- EventType
- path

Note that it does not actually contain the current data or current set of 
children associated with the data/child change notification. It is up to the 
user to do this lookup to see the current data or set of children.

ZkClient is itself a Watcher. When it receives a WatchedEvent, it puts a 
ZkEvent into its own queue which its own ZkEventThread processes. Users of 
ZkClient interact with these notifications through listeners (IZkDataListener, 
IZkChildListener). IZkDataListener actually expects as input the current data 
of the watched znode, and likewise IZkChildListener actually expects as input 
the current set of children of the watched znode. In order to provide this 
information to the listeners, the ZkEventThread, when processing the ZkEvent in 
its queue, looks up the information (either the current data or current set of 
children) simultaneously sets up the next watch, and passes the result to the 
listener.

The regression introduced in KAFKA-5028 is the time at which we lookup the 
information needed for the event processing.

In the past, the lookup from the ZkEventThread during ZkEvent processing would 
be passed into the listener which is processed immediately after. For instance 
in ZkClient.fireChildChangedEvents:
{code}
List children = getChildren(path);
listener.handleChildChange(path, children);
{code}
Now, however, there are multiple listeners that pass information looked up by 
the ZkEventThread into a ControllerEvent which gets processed potentially much 
later. For instance in BrokerChangeListener:
{code}
class BrokerChangeListener(controller: KafkaController) extends 
IZkChildListener with Logging {
  override def handleChildChange(parentPath: String, currentChilds: 
java.util.List[String]): Unit = {
import JavaConverters._

controller.addToControllerEventQueue(controller.BrokerChange(currentChilds.asScala))
  }
}
{code}

In terms of impact, this:
- increases the odds of working with stale information by the time the 
ControllerEvent gets processed.
- can cause the cluster to take a long time to stabilize if you bring up many 
brokers simultaneously.

In terms of how to solve it:
- (short term) just ignore the ZkClient's information lookup and repeat the 
lookup at the start of the ControllerEvent. This is the approach taken in this 
ticket.
- (long term) try to remove a queue. This basically means getting rid of 
ZkClient. This is likely the approach that will be taken in KAFKA-5501.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5501) use async zookeeper apis everywhere

2017-06-22 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-5501:
---

 Summary: use async zookeeper apis everywhere
 Key: KAFKA-5501
 URL: https://issues.apache.org/jira/browse/KAFKA-5501
 Project: Kafka
  Issue Type: Sub-task
Reporter: Onur Karaman
Assignee: Onur Karaman


Synchronous zookeeper writes means that we wait an entire round trip before 
doing the next write. These synchronous writes are happening at a per-partition 
granularity in several places, so partition-heavy clusters suffer from the 
controller doing many sequential round trips to zookeeper.
* PartitionStateMachine.electLeaderForPartition updates leaderAndIsr in 
zookeeper on transition to OnlinePartition. This gets triggered per-partition 
sequentially with synchronous writes during controlled shutdown of the shutting 
down broker's replicas for which it is the leader.
* ReplicaStateMachine updates leaderAndIsr in zookeeper on transition to 
OfflineReplica when calling KafkaController.removeReplicaFromIsr. This gets 
triggered per-partition sequentially with synchronous writes for failed or 
controlled shutdown brokers.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4595) Controller send thread can't stop when broker change listener event trigger for dead brokers

2017-06-02 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16035465#comment-16035465
 ] 

Onur Karaman commented on KAFKA-4595:
-

[~ijuma] Yeah I think KAFKA-5028 solved this issue.

> Controller send thread can't stop when broker change listener event trigger 
> for  dead brokers
> -
>
> Key: KAFKA-4595
> URL: https://issues.apache.org/jira/browse/KAFKA-4595
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0, 0.10.1.1
>Reporter: Pengwei
>Priority: Critical
>  Labels: reliability
> Fix For: 0.11.0.1
>
>
> In our test env, we found controller is not working after a delete topic 
> opertation and network issue, the stack is below:
> "ZkClient-EventThread-15-192.168.1.3:2184,192.168.1.4:2184,192.168.1.5:2184" 
> #15 daemon prio=5 os_prio=0 tid=0x7fb76416e000 nid=0x3019 waiting on 
> condition [0x7fb76b7c8000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xc05497b8> (a 
> java.util.concurrent.CountDownLatch$Sync)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>   at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>   at 
> kafka.utils.ShutdownableThread.awaitShutdown(ShutdownableThread.scala:50)
>   at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:32)
>   at 
> kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$removeExistingBroker(ControllerChannelManager.scala:128)
>   at 
> kafka.controller.ControllerChannelManager.removeBroker(ControllerChannelManager.scala:81)
>   - locked <0xc0258760> (a java.lang.Object)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply$mcVI$sp(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369)
>   at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356)
>   at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)
>   at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
>Locked ownable synchronizers:
>   - <0xc02587f8> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> "Controller-1001-to-broker-1003-send-thread" #88 prio=5 os_prio=0 
> tid=0x7fb778342000 nid=0x5a4c waiting on condition [0x7fb761de]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xc02587f8> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>   at 
> 

[jira] [Commented] (KAFKA-1595) Remove deprecated and slower scala JSON parser from kafka.consumer.TopicCount

2017-06-02 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16035365#comment-16035365
 ] 

Onur Karaman commented on KAFKA-1595:
-

Sounds good.

> Remove deprecated and slower scala JSON parser from kafka.consumer.TopicCount
> -
>
> Key: KAFKA-1595
> URL: https://issues.apache.org/jira/browse/KAFKA-1595
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.8.1.1
>Reporter: Jagbir
>Assignee: Ismael Juma
>  Labels: newbie
>
> The following issue is created as a follow up suggested by Jun Rao
> in a kafka news group message with the Subject
> "Blocking Recursive parsing from 
> kafka.consumer.TopicCount$.constructTopicCount"
> SUMMARY:
> An issue was detected in a typical cluster of 3 kafka instances backed
> by 3 zookeeper instances (kafka version 0.8.1.1, scala version 2.10.3,
> java version 1.7.0_65). On consumer end, when consumers get recycled,
> there is a troubling JSON parsing recursion which takes a busy lock and
> blocks consumers thread pool.
> In 0.8.1.1 scala client library ZookeeperConsumerConnector.scala:355 takes
> a global lock (0xd3a7e1d0) during the rebalance, and fires an
> expensive JSON parsing, while keeping the other consumers from shutting
> down, see, e.g,
> at 
> kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:161)
> The deep recursive JSON parsing should be deprecated in favor
> of a better JSON parser, see, e.g,
> http://engineering.ooyala.com/blog/comparing-scala-json-libraries?
> DETAILS:
> The first dump is for a recursive blocking thread holding the lock for 
> 0xd3a7e1d0
> and the subsequent dump is for a waiting thread.
> (Please grep for 0xd3a7e1d0 to see the locked object.)
> Â 
> -8<-
> "Sa863f22b1e5hjh6788991800900b34545c_profile-a-prod1-s-140789080845312-c397945e8_watcher_executor"
> prio=10 tid=0x7f24dc285800 nid=0xda9 runnable [0x7f249e40b000]
> java.lang.Thread.State: RUNNABLE
> at 
> scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.p$7(Parsers.scala:722)
> at 
> scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.continue$1(Parsers.scala:726)
> at 
> scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:737)
> at 
> scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:721)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> 

[jira] [Comment Edited] (KAFKA-1595) Remove deprecated and slower scala JSON parser from kafka.consumer.TopicCount

2017-05-31 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16032276#comment-16032276
 ] 

Onur Karaman edited comment on KAFKA-1595 at 6/1/17 12:50 AM:
--

I think [~ijuma]'s PR (https://github.com/apache/kafka/pull/83) is worth 
reinvestigating as a means of improving the controller (especially with respect 
to controller initialization time).

I can help out if needed.


was (Author: onurkaraman):
I think [~ijuma]'s PR (https://github.com/apache/kafka/pull/83) is worth 
reinvestigating as a means of improving the controller (especially with respect 
to controller initialization time).

> Remove deprecated and slower scala JSON parser from kafka.consumer.TopicCount
> -
>
> Key: KAFKA-1595
> URL: https://issues.apache.org/jira/browse/KAFKA-1595
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.8.1.1
>Reporter: Jagbir
>Assignee: Ismael Juma
>  Labels: newbie
>
> The following issue is created as a follow up suggested by Jun Rao
> in a kafka news group message with the Subject
> "Blocking Recursive parsing from 
> kafka.consumer.TopicCount$.constructTopicCount"
> SUMMARY:
> An issue was detected in a typical cluster of 3 kafka instances backed
> by 3 zookeeper instances (kafka version 0.8.1.1, scala version 2.10.3,
> java version 1.7.0_65). On consumer end, when consumers get recycled,
> there is a troubling JSON parsing recursion which takes a busy lock and
> blocks consumers thread pool.
> In 0.8.1.1 scala client library ZookeeperConsumerConnector.scala:355 takes
> a global lock (0xd3a7e1d0) during the rebalance, and fires an
> expensive JSON parsing, while keeping the other consumers from shutting
> down, see, e.g,
> at 
> kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:161)
> The deep recursive JSON parsing should be deprecated in favor
> of a better JSON parser, see, e.g,
> http://engineering.ooyala.com/blog/comparing-scala-json-libraries?
> DETAILS:
> The first dump is for a recursive blocking thread holding the lock for 
> 0xd3a7e1d0
> and the subsequent dump is for a waiting thread.
> (Please grep for 0xd3a7e1d0 to see the locked object.)
> Â 
> -8<-
> "Sa863f22b1e5hjh6788991800900b34545c_profile-a-prod1-s-140789080845312-c397945e8_watcher_executor"
> prio=10 tid=0x7f24dc285800 nid=0xda9 runnable [0x7f249e40b000]
> java.lang.Thread.State: RUNNABLE
> at 
> scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.p$7(Parsers.scala:722)
> at 
> scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.continue$1(Parsers.scala:726)
> at 
> scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:737)
> at 
> scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:721)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> 

[jira] [Commented] (KAFKA-1595) Remove deprecated and slower scala JSON parser from kafka.consumer.TopicCount

2017-05-31 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16032276#comment-16032276
 ] 

Onur Karaman commented on KAFKA-1595:
-

I think [~ijuma]'s PR (https://github.com/apache/kafka/pull/83) is worth 
reinvestigating as a means of improving the controller (especially with respect 
to controller initialization time).

> Remove deprecated and slower scala JSON parser from kafka.consumer.TopicCount
> -
>
> Key: KAFKA-1595
> URL: https://issues.apache.org/jira/browse/KAFKA-1595
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.8.1.1
>Reporter: Jagbir
>Assignee: Ismael Juma
>  Labels: newbie
>
> The following issue is created as a follow up suggested by Jun Rao
> in a kafka news group message with the Subject
> "Blocking Recursive parsing from 
> kafka.consumer.TopicCount$.constructTopicCount"
> SUMMARY:
> An issue was detected in a typical cluster of 3 kafka instances backed
> by 3 zookeeper instances (kafka version 0.8.1.1, scala version 2.10.3,
> java version 1.7.0_65). On consumer end, when consumers get recycled,
> there is a troubling JSON parsing recursion which takes a busy lock and
> blocks consumers thread pool.
> In 0.8.1.1 scala client library ZookeeperConsumerConnector.scala:355 takes
> a global lock (0xd3a7e1d0) during the rebalance, and fires an
> expensive JSON parsing, while keeping the other consumers from shutting
> down, see, e.g,
> at 
> kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:161)
> The deep recursive JSON parsing should be deprecated in favor
> of a better JSON parser, see, e.g,
> http://engineering.ooyala.com/blog/comparing-scala-json-libraries?
> DETAILS:
> The first dump is for a recursive blocking thread holding the lock for 
> 0xd3a7e1d0
> and the subsequent dump is for a waiting thread.
> (Please grep for 0xd3a7e1d0 to see the locked object.)
> Â 
> -8<-
> "Sa863f22b1e5hjh6788991800900b34545c_profile-a-prod1-s-140789080845312-c397945e8_watcher_executor"
> prio=10 tid=0x7f24dc285800 nid=0xda9 runnable [0x7f249e40b000]
> java.lang.Thread.State: RUNNABLE
> at 
> scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.p$7(Parsers.scala:722)
> at 
> scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.continue$1(Parsers.scala:726)
> at 
> scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:737)
> at 
> scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:721)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at 
> 

[jira] [Commented] (KAFKA-5328) consider switching json parser from scala to jackson

2017-05-26 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027086#comment-16027086
 ] 

Onur Karaman commented on KAFKA-5328:
-

I copied the topic assignment znodes and partition state znodes from a cluster 
into two text files and just timed how long it took to parse the topic and 
partition znodes using both the scala json parser and jackson.

This experiment had 3066 topics and 95895 partitions (the actual cluster I got 
the znodes from was actually a third the size but I just duplicated lines to 
reflect some of our other clusters).

Here's the code:
{code}
/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package kafka.tools

import java.nio.file.{Files, Paths}

import com.fasterxml.jackson.databind.ObjectMapper
import kafka.api.LeaderAndIsr
import kafka.common.TopicAndPartition
import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.utils.Json

import scala.collection.JavaConverters._
import scala.collection.Seq

object JsonPerformance {
  def main(args: Array[String]): Unit = {
val assignments = 
Files.readAllLines(Paths.get("/Users/okaraman/code/read-zk-state/raw_assignments_big.txt"))
val states = 
Files.readAllLines(Paths.get("/Users/okaraman/code/read-zk-state/raw_states_big.txt"))

time(assignments.asScala.foreach(parseAssignmentWithScala), "scala json 
parser assignments")
time(assignments.asScala.foreach(parseAssignmentWithJackson), "jackson json 
parser assignments")
time(states.asScala.foreach(parseStateWithScala), "scala json parser 
states")
time(states.asScala.foreach(parseStateWithJackson), "jackson json parser 
states")
  }

  def time(f: => Unit, messagePrefix: String): Unit = {
val start = System.currentTimeMillis()
f
val end = System.currentTimeMillis()
println(messagePrefix + s" took: ${end - start} ms")
  }

  def parseAssignmentWithScala(assignment: String): Map[TopicAndPartition, 
Seq[Int]] = {
val json = Json.parseFull(assignment).get
val jsonPartitions = json.asInstanceOf[Map[String, 
Any]].get("partitions").get
val replicaMap = jsonPartitions.asInstanceOf[Map[String, Seq[Int]]]
replicaMap.map { case (partition, replicas) => TopicAndPartition("t", 
partition.toInt) -> replicas }
  }

  def parseAssignmentWithJackson(assignment: String): Map[TopicAndPartition, 
Seq[Int]] = {
val mapper = new ObjectMapper()
val json = mapper.readTree(assignment)
json.get("partitions").asScala.toList.flatMap(x => (0 until x.size()).map(i 
=> TopicAndPartition("t", i) -> x.get(i).asScala.toSeq.map(_.asInt(.toMap
  }

  def parseStateWithScala(state: String): LeaderIsrAndControllerEpoch = {
val json = Json.parseFull(state).get
val leaderIsrAndEpochInfo = json.asInstanceOf[Map[String, Any]]
val leader = leaderIsrAndEpochInfo.get("leader").get.asInstanceOf[Int]
val epoch = leaderIsrAndEpochInfo.get("leader_epoch").get.asInstanceOf[Int]
val isr = leaderIsrAndEpochInfo.get("isr").get.asInstanceOf[List[Int]]
val controllerEpoch = 
leaderIsrAndEpochInfo.get("controller_epoch").get.asInstanceOf[Int]
LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, 1), 
controllerEpoch)
  }

  def parseStateWithJackson(state: String): LeaderIsrAndControllerEpoch = {
val mapper = new ObjectMapper()
val json = mapper.readTree(state)
val leader = json.get("leader").asInt()
val epoch = json.get("leader_epoch").asInt()
val isr = json.get("isr").asScala.toList.map(_.asInt())
val controllerEpoch = json.get("controller_epoch").asInt()
LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, 1), 
controllerEpoch)
  }
}
{code}

Here's the output:
{code}
scala json parser assignments took: 8730 ms
jackson json parser assignments took: 354 ms
scala json parser states took: 28395 ms
jackson json parser states took: 360 ms
{code}

So controller initialization time spent on json parsing would be reduced from 
37.1 seconds down to 0.7 seconds.

> consider switching json parser from scala to jackson
> 
>
> Key: KAFKA-5328
> URL: 

[jira] [Created] (KAFKA-5328) consider switching json parser from scala to jackson

2017-05-25 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-5328:
---

 Summary: consider switching json parser from scala to jackson
 Key: KAFKA-5328
 URL: https://issues.apache.org/jira/browse/KAFKA-5328
 Project: Kafka
  Issue Type: Sub-task
Reporter: Onur Karaman
Assignee: Onur Karaman


The scala json parser is significantly slower than jackson.

This can have a nontrivial impact on controller initialization since the 
controller loads and parses almost all zookeeper state.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5323) AdminUtils.createTopic should check topic existence upfront

2017-05-25 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman updated KAFKA-5323:

Status: Patch Available  (was: Open)

> AdminUtils.createTopic should check topic existence upfront
> ---
>
> Key: KAFKA-5323
> URL: https://issues.apache.org/jira/browse/KAFKA-5323
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> When a topic exists, AdminUtils.createTopic unnecessarily does N+2 zookeeper 
> reads where N is the number of brokers. Here is the breakdown of the N+2 
> zookeeper reads:
> # reads the current list of brokers in zookeeper (1 zookeeper read)
> # reads metadata for each broker in zookeeper (N zookeeper reads where N is 
> the number of brokers)
> # checks for topic existence in zookeeper (1 zookeeper read)
> We can reduce the N+2 reads down to 1 by checking topic existence upfront.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5323) AdminUtils.createTopic should check topic existence upfront

2017-05-24 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16024223#comment-16024223
 ] 

Onur Karaman commented on KAFKA-5323:
-

This can have a larger impact than one might initially suspect. For instance, a 
broker only populates its MetadataCache after it has joined the cluster and the 
controller sends it an UpdateMetadataRequest. But a broker can begin processing 
requests even before registering itself in zookeeper (before the controller 
even knows the broker is alive). In other words, a broker can begin processing 
MetadataRequests before processing the controller's UpdateMetadataRequest 
following broker registration.

Processing these MetadataRequests in this scenario leads to large local times 
and can cause substantial request queue backup, causing significant delays in 
the broker processing its initial UpdateMetadataRequest. Since the broker 
hasn't received any UpdateMetadataRequest from the controller yet, its 
MetadataCache is empty. So the topics from all the client MetadataRequests are 
treated as brand new topics, which means the broker tries to auto create these 
topics. For each pre-existing topic queried in the MetadataRequest, auto topic 
creation performs the N+2 zookeeper reads mentioned earlier.

In one bad production scenario (while recovering from KAFKA-4959), this caused 
a significant delay in bringing replicas online, as both the initial 
LeaderAndIsrRequest and UpdateMetadataRequest from the controller on broker 
startup was stuck behind these client MetadataRequests hammering zookeeper.

> AdminUtils.createTopic should check topic existence upfront
> ---
>
> Key: KAFKA-5323
> URL: https://issues.apache.org/jira/browse/KAFKA-5323
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> When a topic exists, AdminUtils.createTopic unnecessarily does N+2 zookeeper 
> reads where N is the number of brokers. Here is the breakdown of the N+2 
> zookeeper reads:
> # reads the current list of brokers in zookeeper (1 zookeeper read)
> # reads metadata for each broker in zookeeper (N zookeeper reads where N is 
> the number of brokers)
> # checks for topic existence in zookeeper (1 zookeeper read)
> We can reduce the N+2 reads down to 1 by checking topic existence upfront.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-5323) AdminUtils.createTopic should check topic existence upfront

2017-05-24 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-5323:
---

 Summary: AdminUtils.createTopic should check topic existence 
upfront
 Key: KAFKA-5323
 URL: https://issues.apache.org/jira/browse/KAFKA-5323
 Project: Kafka
  Issue Type: Improvement
Reporter: Onur Karaman
Assignee: Onur Karaman


When a topic exists, AdminUtils.createTopic unnecessarily does N+2 zookeeper 
reads where N is the number of brokers. Here is the breakdown of the N+2 
zookeeper reads:
# reads the current list of brokers in zookeeper (1 zookeeper read)
# reads metadata for each broker in zookeeper (N zookeeper reads where N is the 
number of brokers)
# checks for topic existence in zookeeper (1 zookeeper read)

We can reduce the N+2 reads down to 1 by checking topic existence upfront.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5310) reset ControllerContext during resignation

2017-05-23 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman updated KAFKA-5310:

Status: Patch Available  (was: Open)

> reset ControllerContext during resignation
> --
>
> Key: KAFKA-5310
> URL: https://issues.apache.org/jira/browse/KAFKA-5310
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> This ticket is all about ControllerContext initialization and teardown. The 
> key points are:
> 1. we should teardown ControllerContext during resignation instead of waiting 
> on election to fix it up. A heapdump shows that the former controller keeps 
> pretty much all of its ControllerContext state laying around.
> 2. we don't properly teardown/reset 
> {{ControllerContext.partitionsBeingReassigned}}. This caused problems for us 
> in a production cluster at linkedin as shown in the scenario below:
> {code}
> > rm -rf /tmp/zookeeper/ /tmp/kafka-logs* logs*
> > ./gradlew clean jar
> > ./bin/zookeeper-server-start.sh config/zookeeper.properties
> > export LOG_DIR=logs0 && ./bin/kafka-server-start.sh 
> > config/server0.properties
> > export LOG_DIR=logs1 && ./bin/kafka-server-start.sh 
> > config/server1.properties
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t 
> > --replica-assignment 1
> > ./bin/zookeeper-shell.sh localhost:2181
> get /brokers/topics/t
> {"version":1,"partitions":{"0":[1]}}
> create /admin/reassign_partitions 
> {"partitions":[{"topic":"t","partition":0,"replicas":[1,2]}],"version":1}
> Created /admin/reassign_partitions
> get /brokers/topics/t
> {"version":1,"partitions":{"0":[1,2]}}
> get /admin/reassign_partitions
> {"version":1,"partitions":[{"topic":"t","partition":0,"replicas":[1,2]}]}
> delete /admin/reassign_partitions
> delete /controller
> get /brokers/topics/t
> {"version":1,"partitions":{"0":[1,2]}}
> get /admin/reassign_partitions
> Node does not exist: /admin/reassign_partitions
> > echo 
> > '{"partitions":[{"topic":"t","partition":0,"replicas":[1]}],"version":1}' > 
> > reassignment.txt
> > ./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 
> > --reassignment-json-file reassignment.txt --execute
> get /brokers/topics/t
> {"version":1,"partitions":{"0":[1]}}
> get /admin/reassign_partitions
> Node does not exist: /admin/reassign_partitions
> delete /controller
> get /brokers/topics/t
> {"version":1,"partitions":{"0":[1,2]}}
> get /admin/reassign_partitions
> Node does not exist: /admin/reassign_partitions
> {code}
> Notice that the replica set goes from \[1\] to \[1,2\] (as expected with the 
> explicit {{/admin/reassign_partitions}} znode creation during the initial 
> controller) back to \[1\] (as expected with the partition reassignment during 
> the second controller) and again back to \[1,2\] after the original 
> controller gets re-elected.
> That last transition from \[1\] to \[1,2\] is unexpected. It's due to the 
> original controller not resetting its 
> {{ControllerContext.partitionsBeingReassigned}} correctly. 
> {{initializePartitionReassignment}} simply adds to what's already in 
> {{ControllerContext.partitionsBeingReassigned}}.
> The explicit {{/admin/reassign_partitions}} znode creation is to circumvent 
> KAFKA-5161 (95b48b157aca44beec4335e62a59f37097fe7499). Doing so is valid 
> since:
> 1. our code in production doesn't have that change
> 2. KAFKA-5161 doesn't address the underlying race condition between a broker 
> failure and the ReassignPartitionsCommand tool creating the znode.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5310) reset ControllerContext during resignation

2017-05-23 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman updated KAFKA-5310:

Description: 
This ticket is all about ControllerContext initialization and teardown. The key 
points are:
1. we should teardown ControllerContext during resignation instead of waiting 
on election to fix it up. A heapdump shows that the former controller keeps 
pretty much all of its ControllerContext state laying around.
2. we don't properly teardown/reset 
{{ControllerContext.partitionsBeingReassigned}}. This caused problems for us in 
a production cluster at linkedin as shown in the scenario below:
{code}
> rm -rf /tmp/zookeeper/ /tmp/kafka-logs* logs*
> ./gradlew clean jar
> ./bin/zookeeper-server-start.sh config/zookeeper.properties
> export LOG_DIR=logs0 && ./bin/kafka-server-start.sh config/server0.properties
> export LOG_DIR=logs1 && ./bin/kafka-server-start.sh config/server1.properties
> ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t 
> --replica-assignment 1
> ./bin/zookeeper-shell.sh localhost:2181

get /brokers/topics/t
{"version":1,"partitions":{"0":[1]}}

create /admin/reassign_partitions 
{"partitions":[{"topic":"t","partition":0,"replicas":[1,2]}],"version":1}
Created /admin/reassign_partitions

get /brokers/topics/t
{"version":1,"partitions":{"0":[1,2]}}

get /admin/reassign_partitions
{"version":1,"partitions":[{"topic":"t","partition":0,"replicas":[1,2]}]}

delete /admin/reassign_partitions
delete /controller

get /brokers/topics/t
{"version":1,"partitions":{"0":[1,2]}}

get /admin/reassign_partitions
Node does not exist: /admin/reassign_partitions

> echo 
> '{"partitions":[{"topic":"t","partition":0,"replicas":[1]}],"version":1}' > 
> reassignment.txt
> ./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 
> --reassignment-json-file reassignment.txt --execute

get /brokers/topics/t
{"version":1,"partitions":{"0":[1]}}

get /admin/reassign_partitions
Node does not exist: /admin/reassign_partitions

delete /controller

get /brokers/topics/t
{"version":1,"partitions":{"0":[1,2]}}

get /admin/reassign_partitions
Node does not exist: /admin/reassign_partitions
{code}

Notice that the replica set goes from \[1\] to \[1,2\] (as expected with the 
explicit {{/admin/reassign_partitions}} znode creation during the initial 
controller) back to \[1\] (as expected with the partition reassignment during 
the second controller) and again back to \[1,2\] after the original controller 
gets re-elected.

That last transition from \[1\] to \[1,2\] is unexpected. It's due to the 
original controller not resetting its 
{{ControllerContext.partitionsBeingReassigned}} correctly. 
{{initializePartitionReassignment}} simply adds to what's already in 
{{ControllerContext.partitionsBeingReassigned}}.

The explicit {{/admin/reassign_partitions}} znode creation is to circumvent 
KAFKA-5161 (95b48b157aca44beec4335e62a59f37097fe7499). Doing so is valid since:
1. our code in production doesn't have that change
2. KAFKA-5161 doesn't address the underlying race condition between a broker 
failure and the ReassignPartitionsCommand tool creating the znode.

  was:
This ticket is all about ControllerContext initialization and teardown. The key 
points are:
1. we should teardown ControllerContext during resignation instead of waiting 
on election to fix it up. A heapdump shows that the former controller keeps 
pretty much all of its ControllerContext state laying around.
2. we don't properly teardown/reset 
`ControllerContext.partitionsBeingReassigned`. This caused problems for us in a 
production cluster at linkedin as shown in the scenario below:
{code}
> rm -rf /tmp/zookeeper/ /tmp/kafka-logs* logs*
> ./gradlew clean jar
> ./bin/zookeeper-server-start.sh config/zookeeper.properties
> export LOG_DIR=logs0 && ./bin/kafka-server-start.sh config/server0.properties
> export LOG_DIR=logs1 && ./bin/kafka-server-start.sh config/server1.properties
> ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t 
> --replica-assignment 1
> ./bin/zookeeper-shell.sh localhost:2181

get /brokers/topics/t
{"version":1,"partitions":{"0":[1]}}

create /admin/reassign_partitions 
{"partitions":[{"topic":"t","partition":0,"replicas":[1,2]}],"version":1}
Created /admin/reassign_partitions

get /brokers/topics/t
{"version":1,"partitions":{"0":[1,2]}}

get /admin/reassign_partitions
{"version":1,"partitions":[{"topic":"t","partition":0,"replicas":[1,2]}]}

delete /admin/reassign_partitions
delete /controller

get /brokers/topics/t
{"version":1,"partitions":{"0":[1,2]}}

get /admin/reassign_partitions
Node does not exist: /admin/reassign_partitions

> echo 
> '{"partitions":[{"topic":"t","partition":0,"replicas":[1]}],"version":1}' > 
> reassignment.txt
> ./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 
> --reassignment-json-file reassignment.txt --execute

get /brokers/topics/t

[jira] [Created] (KAFKA-5310) reset ControllerContext during resignation

2017-05-23 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-5310:
---

 Summary: reset ControllerContext during resignation
 Key: KAFKA-5310
 URL: https://issues.apache.org/jira/browse/KAFKA-5310
 Project: Kafka
  Issue Type: Sub-task
Reporter: Onur Karaman
Assignee: Onur Karaman


This ticket is all about ControllerContext initialization and teardown. The key 
points are:
1. we should teardown ControllerContext during resignation instead of waiting 
on election to fix it up. A heapdump shows that the former controller keeps 
pretty much all of its ControllerContext state laying around.
2. we don't properly teardown/reset 
`ControllerContext.partitionsBeingReassigned`. This caused problems for us in a 
production cluster at linkedin as shown in the scenario below:
{code}
> rm -rf /tmp/zookeeper/ /tmp/kafka-logs* logs*
> ./gradlew clean jar
> ./bin/zookeeper-server-start.sh config/zookeeper.properties
> export LOG_DIR=logs0 && ./bin/kafka-server-start.sh config/server0.properties
> export LOG_DIR=logs1 && ./bin/kafka-server-start.sh config/server1.properties
> ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t 
> --replica-assignment 1
> ./bin/zookeeper-shell.sh localhost:2181

get /brokers/topics/t
{"version":1,"partitions":{"0":[1]}}

create /admin/reassign_partitions 
{"partitions":[{"topic":"t","partition":0,"replicas":[1,2]}],"version":1}
Created /admin/reassign_partitions

get /brokers/topics/t
{"version":1,"partitions":{"0":[1,2]}}

get /admin/reassign_partitions
{"version":1,"partitions":[{"topic":"t","partition":0,"replicas":[1,2]}]}

delete /admin/reassign_partitions
delete /controller

get /brokers/topics/t
{"version":1,"partitions":{"0":[1,2]}}

get /admin/reassign_partitions
Node does not exist: /admin/reassign_partitions

> echo 
> '{"partitions":[{"topic":"t","partition":0,"replicas":[1]}],"version":1}' > 
> reassignment.txt
> ./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 
> --reassignment-json-file reassignment.txt --execute

get /brokers/topics/t
{"version":1,"partitions":{"0":[1]}}

get /admin/reassign_partitions
Node does not exist: /admin/reassign_partitions

delete /controller

get /brokers/topics/t
{"version":1,"partitions":{"0":[1,2]}}

get /admin/reassign_partitions
Node does not exist: /admin/reassign_partitions
{code}

Notice that the replica set goes from \[1\] to \[1,2\] (as expected with the 
explicit `/admin/reassign_partitions` znode creation during the initial 
controller) back to \[1\] (as expected with the partition reassignment during 
the second controller) and again back to \[1,2\] after the original controller 
gets re-elected.

That last transition from \[1\] to \[1,2\] is unexpected. It's due to the 
original controller not resetting its 
`ControllerContext.partitionsBeingReassigned` correctly. 
`initializePartitionReassignment` simply adds to what's already in 
`ControllerContext.partitionsBeingReassigned`.

The explicit `/admin/reassign_partitions` znode creation is to circumvent 
KAFKA-5161 (95b48b157aca44beec4335e62a59f37097fe7499). Doing so is valid since:
1. our code in production doesn't have that change
2. KAFKA-5161 doesn't address the underlying race condition between a broker 
failure and the ReassignPartitionsCommand tool creating the znode.

It looks like this bug has been around for quite some time (definitely before 
0.10.2).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4896) Offset loading can use more threads

2017-05-22 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16020447#comment-16020447
 ] 

Onur Karaman commented on KAFKA-4896:
-

handleGroupImmigration only gets called from handling a LeaderAndIsrRequest. 
Its cache load is synchronized over the ReplicaManager's replicaStateChangeLock.

handleGroupEmigration gets called in two places: when handling a 
LeaderAndIsrRequest as well as from StopReplicaRequest. Its cache removal is 
synchronized over the ReplicaManager's replicaStateChangeLock only in the 
LeaderAndIsrRequest handling path.

I think even with today's single scheduler thread, we might have an interleaved 
load and removal problem when the following are happening concurrently:
1. handleGroupImmigration is called for a partition P through 
LeaderAndIsrRequest
2. handleGroupEmigration is called for a partition P through StopReplicaRequest

This should be rare. Controller does a blocking send and receive, so I think 
this should only happen if:
1. one of the requests times out while the controller stays the same
2. the controller moves

[~junrao] does this edge case sound right?

> Offset loading can use more threads
> ---
>
> Key: KAFKA-4896
> URL: https://issues.apache.org/jira/browse/KAFKA-4896
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.10.2.0
>Reporter: Jun Rao
>Assignee: Abhishek Mendhekar
>  Labels: newbie
>
> Currently, in GroupMetadataManager, we have a single thread for loading the 
> offset cache. We could speed it up with more threads.
>  /* single-thread scheduler to handle offset/group metadata cache loading and 
> unloading */
>   private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = 
> "group-metadata-manager-")



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5258) move all partition and replica state transition rules into their states

2017-05-17 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman updated KAFKA-5258:

Description: 
Today the PartitionStateMachine and ReplicaStateMachine defines and asserts the 
valid state transitions inline for each state, looking something like:
{code}
private def handleStateChange(...) {
  targetState match {
case stateA => {
  assertValidPreviousStates(topicAndPartition, List(stateX, stateY, 
stateZ), stateA)
  // actual work
}
case stateB => {
  assertValidPreviousStates(topicAndPartition, List(stateD, stateE), stateB)
  // actual work
}
  }
}
{code}
It would be cleaner to move all partition and replica state transition rules 
into their states and simply do the assertion at the top of the 
handleStateChange method like so:
{code}
private def handleStateChange(...) {
  assertValidTransition(targetState)
  targetState match {
case stateA => {
  // actual work
}
case stateB => {
  // actual work
}
  }
}

sealed trait State {
  def state: Byte
  def validPreviousStates: Set[State]
}

case object StateA extends State {
  val state: Byte = 1
  val validPreviousStates: Set[State] = Set(StateX)
}

case object StateB extends State {
  val state: Byte = 2
  val validPreviousStates: Set[State] = Set(StateX, StateY, StateZ)
}
{code}

  was:
Today the PartitionStateMachine and ReplicaStateMachine defines and asserts the 
valid state transitions inline for each state, looking something like:
{code}
private def handleStateChange(...) {
  targetState match {
case stateA => {
  assertValidPreviousStates(topicAndPartition, List(stateX, stateY, 
stateZ), stateA)
  // actual work
}
case stateB => {
  assertValidPreviousStates(topicAndPartition, List(stateD, stateE), stateB)
  // actual work
}
  }
}
{code}
It would be cleaner to move all partition and replica state transition rules 
into their and simply do the assertion at the top of the handleStateChange 
method like so:
{code}
private def handleStateChange(...) {
  assertValidTransition(targetState)
  targetState match {
case stateA => {
  // actual work
}
case stateB => {
  // actual work
}
  }
}

sealed trait State {
  def state: Byte
  def validPreviousStates: Set[State]
}

case object StateA extends State {
  val state: Byte = 1
  val validPreviousStates: Set[State] = Set(StateX)
}

case object StateB extends State {
  val state: Byte = 2
  val validPreviousStates: Set[State] = Set(StateX, StateY, StateZ)
}
{code}


> move all partition and replica state transition rules into their states
> ---
>
> Key: KAFKA-5258
> URL: https://issues.apache.org/jira/browse/KAFKA-5258
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>Priority: Minor
>
> Today the PartitionStateMachine and ReplicaStateMachine defines and asserts 
> the valid state transitions inline for each state, looking something like:
> {code}
> private def handleStateChange(...) {
>   targetState match {
> case stateA => {
>   assertValidPreviousStates(topicAndPartition, List(stateX, stateY, 
> stateZ), stateA)
>   // actual work
> }
> case stateB => {
>   assertValidPreviousStates(topicAndPartition, List(stateD, stateE), 
> stateB)
>   // actual work
> }
>   }
> }
> {code}
> It would be cleaner to move all partition and replica state transition rules 
> into their states and simply do the assertion at the top of the 
> handleStateChange method like so:
> {code}
> private def handleStateChange(...) {
>   assertValidTransition(targetState)
>   targetState match {
> case stateA => {
>   // actual work
> }
> case stateB => {
>   // actual work
> }
>   }
> }
> sealed trait State {
>   def state: Byte
>   def validPreviousStates: Set[State]
> }
> case object StateA extends State {
>   val state: Byte = 1
>   val validPreviousStates: Set[State] = Set(StateX)
> }
> case object StateB extends State {
>   val state: Byte = 2
>   val validPreviousStates: Set[State] = Set(StateX, StateY, StateZ)
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5258) move all partition and replica state transition rules into their states

2017-05-17 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman updated KAFKA-5258:

Description: 
Today the PartitionStateMachine and ReplicaStateMachine defines and asserts the 
valid state transitions inline for each state, looking something like:
{code}
private def handleStateChange(...) {
  targetState match {
case stateA => {
  assertValidPreviousStates(topicAndPartition, List(stateX, stateY, 
stateZ), stateA)
  // actual work
}
case stateB => {
  assertValidPreviousStates(topicAndPartition, List(stateD, stateE), stateB)
  // actual work
}
  }
}
{code}
It would be cleaner to move all partition and replica state transition rules 
into their and simply do the assertion at the top of the handleStateChange 
method like so:
{code}
private def handleStateChange(...) {
  assertValidTransition(targetState)
  targetState match {
case stateA => {
  // actual work
}
case stateB => {
  // actual work
}
  }
}

sealed trait State {
  def state: Byte
  def validPreviousStates: Set[State]
}

case object StateA extends State {
  val state: Byte = 1
  val validPreviousStates: Set[State] = Set(StateX)
}

case object StateB extends State {
  val state: Byte = 2
  val validPreviousStates: Set[State] = Set(StateX, StateY, StateZ)
}
{code}

  was:
Today the PartitionStateMachine and ReplicaStateMachine defines and asserts the 
valid state transitions inline for each state, looking something like:
{code}
private def handleStateChange(...) {
  targetState match {
case stateA => {
  assertValidPreviousStates(topicAndPartition, List(stateX, stateY, 
stateZ), stateA)
  // actual work
}
case stateB => {
  assertValidPreviousStates(topicAndPartition, List(stateD, stateE), stateB)
  // actual work
}
  }
}
{code}
It would be cleaner to move all partition and replica state transition rules 
into a map and simply do the assertion at the top of the handleStateChange 
method like so:
{code}
private val validPreviousStates: Map[State, Set[State]] = ...
private def handleStateChange(...) {
  assertValidTransition(targetState)
  targetState match {
case stateA => {
  // actual work
}
case stateB => {
  // actual work
}
  }
}
{code}


> move all partition and replica state transition rules into their states
> ---
>
> Key: KAFKA-5258
> URL: https://issues.apache.org/jira/browse/KAFKA-5258
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>Priority: Minor
>
> Today the PartitionStateMachine and ReplicaStateMachine defines and asserts 
> the valid state transitions inline for each state, looking something like:
> {code}
> private def handleStateChange(...) {
>   targetState match {
> case stateA => {
>   assertValidPreviousStates(topicAndPartition, List(stateX, stateY, 
> stateZ), stateA)
>   // actual work
> }
> case stateB => {
>   assertValidPreviousStates(topicAndPartition, List(stateD, stateE), 
> stateB)
>   // actual work
> }
>   }
> }
> {code}
> It would be cleaner to move all partition and replica state transition rules 
> into their and simply do the assertion at the top of the handleStateChange 
> method like so:
> {code}
> private def handleStateChange(...) {
>   assertValidTransition(targetState)
>   targetState match {
> case stateA => {
>   // actual work
> }
> case stateB => {
>   // actual work
> }
>   }
> }
> sealed trait State {
>   def state: Byte
>   def validPreviousStates: Set[State]
> }
> case object StateA extends State {
>   val state: Byte = 1
>   val validPreviousStates: Set[State] = Set(StateX)
> }
> case object StateB extends State {
>   val state: Byte = 2
>   val validPreviousStates: Set[State] = Set(StateX, StateY, StateZ)
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5258) move all partition and replica state transition rules into their states

2017-05-17 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman updated KAFKA-5258:

Summary: move all partition and replica state transition rules into their 
states  (was: move all partition and replica state transition rules into a map)

> move all partition and replica state transition rules into their states
> ---
>
> Key: KAFKA-5258
> URL: https://issues.apache.org/jira/browse/KAFKA-5258
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>Priority: Minor
>
> Today the PartitionStateMachine and ReplicaStateMachine defines and asserts 
> the valid state transitions inline for each state, looking something like:
> {code}
> private def handleStateChange(...) {
>   targetState match {
> case stateA => {
>   assertValidPreviousStates(topicAndPartition, List(stateX, stateY, 
> stateZ), stateA)
>   // actual work
> }
> case stateB => {
>   assertValidPreviousStates(topicAndPartition, List(stateD, stateE), 
> stateB)
>   // actual work
> }
>   }
> }
> {code}
> It would be cleaner to move all partition and replica state transition rules 
> into a map and simply do the assertion at the top of the handleStateChange 
> method like so:
> {code}
> private val validPreviousStates: Map[State, Set[State]] = ...
> private def handleStateChange(...) {
>   assertValidTransition(targetState)
>   targetState match {
> case stateA => {
>   // actual work
> }
> case stateB => {
>   // actual work
> }
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5261) Performance improvement of SimpleAclAuthorizer

2017-05-17 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16013665#comment-16013665
 ] 

Onur Karaman commented on KAFKA-5261:
-

Would one alternative be to just make a CachedAclAuthorizer which decorates the 
SimpleAclAuthorizer with a cache?

> Performance improvement of SimpleAclAuthorizer
> --
>
> Key: KAFKA-5261
> URL: https://issues.apache.org/jira/browse/KAFKA-5261
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.10.2.1
>Reporter: Stephane Maarek
>
> Currently, looking at the KafkaApis class, it seems that every request going 
> through Kafka is also going through an authorize check:
> {code}
>   private def authorize(session: Session, operation: Operation, resource: 
> Resource): Boolean =
> authorizer.forall(_.authorize(session, operation, resource))
> {code}
> The SimpleAclAuthorizer logic runs through checks which all look to be done 
> in linear time (except on first run) proportional to the number of acls on a 
> specific resource. This operation is re-run every time a client tries to use 
> a Kafka Api, especially on the very often called `handleProducerRequest` and  
> `handleFetchRequest`
> I believe a cache could be built to store the result of the authorize call, 
> possibly allowing more expensive authorize() calls to happen, and reducing 
> greatly the CPU usage in the long run. The cache would be invalidated every 
> time a change happens to aclCache
> Thoughts before I try giving it a go with a PR? 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5258) move all partition and replica state transition rules into a map

2017-05-16 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman updated KAFKA-5258:

Status: Patch Available  (was: In Progress)

> move all partition and replica state transition rules into a map
> 
>
> Key: KAFKA-5258
> URL: https://issues.apache.org/jira/browse/KAFKA-5258
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>Priority: Minor
>
> Today the PartitionStateMachine and ReplicaStateMachine defines and asserts 
> the valid state transitions inline for each state, looking something like:
> {code}
> private def handleStateChange(...) {
>   targetState match {
> case stateA => {
>   assertValidPreviousStates(topicAndPartition, List(stateX, stateY, 
> stateZ), stateA)
>   // actual work
> }
> case stateB => {
>   assertValidPreviousStates(topicAndPartition, List(stateD, stateE), 
> stateB)
>   // actual work
> }
>   }
> }
> {code}
> It would be cleaner to move all partition and replica state transition rules 
> into a map and simply do the assertion at the top of the handleStateChange 
> method like so:
> {code}
> private val validPreviousStates: Map[State, Set[State]] = ...
> private def handleStateChange(...) {
>   assertValidTransition(targetState)
>   targetState match {
> case stateA => {
>   // actual work
> }
> case stateB => {
>   // actual work
> }
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Work started] (KAFKA-5258) move all partition and replica state transition rules into a map

2017-05-16 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-5258 started by Onur Karaman.
---
> move all partition and replica state transition rules into a map
> 
>
> Key: KAFKA-5258
> URL: https://issues.apache.org/jira/browse/KAFKA-5258
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>Priority: Minor
>
> Today the PartitionStateMachine and ReplicaStateMachine defines and asserts 
> the valid state transitions inline for each state, looking something like:
> {code}
> private def handleStateChange(...) {
>   targetState match {
> case stateA => {
>   assertValidPreviousStates(topicAndPartition, List(stateX, stateY, 
> stateZ), stateA)
>   // actual work
> }
> case stateB => {
>   assertValidPreviousStates(topicAndPartition, List(stateD, stateE), 
> stateB)
>   // actual work
> }
>   }
> }
> {code}
> It would be cleaner to move all partition and replica state transition rules 
> into a map and simply do the assertion at the top of the handleStateChange 
> method like so:
> {code}
> private val validPreviousStates: Map[State, Set[State]] = ...
> private def handleStateChange(...) {
>   assertValidTransition(targetState)
>   targetState match {
> case stateA => {
>   // actual work
> }
> case stateB => {
>   // actual work
> }
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-5258) move all partition and replica state transition rules into a map

2017-05-16 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-5258:
---

 Summary: move all partition and replica state transition rules 
into a map
 Key: KAFKA-5258
 URL: https://issues.apache.org/jira/browse/KAFKA-5258
 Project: Kafka
  Issue Type: Sub-task
Reporter: Onur Karaman
Assignee: Onur Karaman
Priority: Minor


Today the PartitionStateMachine and ReplicaStateMachine defines and asserts the 
valid state transitions inline for each state, looking something like:
{code}
private def handleStateChange(...) {
  targetState match {
case stateA => {
  assertValidPreviousStates(topicAndPartition, List(stateX, stateY, 
stateZ), stateA)
  // actual work
}
case stateB => {
  assertValidPreviousStates(topicAndPartition, List(stateD, stateE), stateB)
  // actual work
}
  }
}
{code}
It would be cleaner to move all partition and replica state transition rules 
into a map and simply do the assertion at the top of the handleStateChange 
method like so:
{code}
private val validPreviousStates: Map[State, Set[State]] = ...
private def handleStateChange(...) {
  assertValidTransition(targetState)
  targetState match {
case stateA => {
  // actual work
}
case stateB => {
  // actual work
}
  }
}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (KAFKA-5175) Transient failure: ControllerIntegrationTest.testPreferredReplicaLeaderElection

2017-05-16 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16011851#comment-16011851
 ] 

Onur Karaman edited comment on KAFKA-5175 at 5/16/17 7:01 AM:
--

Cool. I can steadily reproduce this by inserting a long sleep in 
Partition.maybeExpandIsr:
{code}
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 1d13689..b811d31 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -281,6 +281,7 @@ class Partition(val topic: String,
   def maybeExpandIsr(replicaId: Int, logReadResult: LogReadResult): Boolean = {
 inWriteLock(leaderIsrUpdateLock) {
   // check if this replica needs to be added to the ISR
+  Thread.sleep(10)
   leaderReplicaIfLocal match {
 case Some(leaderReplica) =>
   val replica = getReplica(replicaId).get
{code}

This makes me think that the controller is processing the preferred replica 
leader election before the restarted broker (the preferred replica leader) has 
joined isr, causing preferred replica leader election to fail and for the final 
zookeeper state validation to fail.

{code}
kafka.controller.ControllerIntegrationTest > testPreferredReplicaLeaderElection 
FAILED
java.lang.AssertionError: failed to get expected partition state upon 
broker startup
at kafka.utils.TestUtils$.fail(TestUtils.scala:323)
at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:823)
at 
kafka.controller.ControllerIntegrationTest.waitForPartitionState(ControllerIntegrationTest.scala:291)
at 
kafka.controller.ControllerIntegrationTest.testPreferredReplicaLeaderElection(ControllerIntegrationTest.scala:204)
{code}


was (Author: onurkaraman):
Cool. I can steadily reproduce this by inserting a long sleep in 
Partition.maybeExpandIsr:
{code}
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 1d13689..b811d31 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -281,6 +281,7 @@ class Partition(val topic: String,
   def maybeExpandIsr(replicaId: Int, logReadResult: LogReadResult): Boolean = {
 inWriteLock(leaderIsrUpdateLock) {
   // check if this replica needs to be added to the ISR
+  Thread.sleep(10)
   leaderReplicaIfLocal match {
 case Some(leaderReplica) =>
   val replica = getReplica(replicaId).get
{code}

This makes me think that the controller is processing the preferred replica 
leader election before the restarted broker (the preferred replica leader) has 
joined isr, causing preferred replica leader election to fail and for the final 
zookeeper state validation to fail.

> Transient failure: 
> ControllerIntegrationTest.testPreferredReplicaLeaderElection
> ---
>
> Key: KAFKA-5175
> URL: https://issues.apache.org/jira/browse/KAFKA-5175
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Onur Karaman
>
> {code}
> java.lang.AssertionError: failed to get expected partition state upon broker 
> startup
>   at kafka.utils.TestUtils$.fail(TestUtils.scala:311)
>   at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:811)
>   at 
> kafka.controller.ControllerIntegrationTest.waitForPartitionState(ControllerIntegrationTest.scala:293)
>   at 
> kafka.controller.ControllerIntegrationTest.testPreferredReplicaLeaderElection(ControllerIntegrationTest.scala:211)
> {code}
> https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3497/testReport/kafka.controller/ControllerIntegrationTest/testPreferredReplicaLeaderElection/



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5175) Transient failure: ControllerIntegrationTest.testPreferredReplicaLeaderElection

2017-05-16 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16011851#comment-16011851
 ] 

Onur Karaman commented on KAFKA-5175:
-

Cool. I can steadily reproduce this by inserting a long sleep in 
Partition.maybeExpandIsr:
{code}
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 1d13689..b811d31 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -281,6 +281,7 @@ class Partition(val topic: String,
   def maybeExpandIsr(replicaId: Int, logReadResult: LogReadResult): Boolean = {
 inWriteLock(leaderIsrUpdateLock) {
   // check if this replica needs to be added to the ISR
+  Thread.sleep(10)
   leaderReplicaIfLocal match {
 case Some(leaderReplica) =>
   val replica = getReplica(replicaId).get
{code}

This makes me think that the controller is processing the preferred replica 
leader election before the restarted broker (the preferred replica leader) has 
joined isr, causing preferred replica leader election to fail and for the final 
zookeeper state validation to fail.

> Transient failure: 
> ControllerIntegrationTest.testPreferredReplicaLeaderElection
> ---
>
> Key: KAFKA-5175
> URL: https://issues.apache.org/jira/browse/KAFKA-5175
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Onur Karaman
>
> {code}
> java.lang.AssertionError: failed to get expected partition state upon broker 
> startup
>   at kafka.utils.TestUtils$.fail(TestUtils.scala:311)
>   at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:811)
>   at 
> kafka.controller.ControllerIntegrationTest.waitForPartitionState(ControllerIntegrationTest.scala:293)
>   at 
> kafka.controller.ControllerIntegrationTest.testPreferredReplicaLeaderElection(ControllerIntegrationTest.scala:211)
> {code}
> https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3497/testReport/kafka.controller/ControllerIntegrationTest/testPreferredReplicaLeaderElection/



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5175) Transient failure: ControllerIntegrationTest.testPreferredReplicaLeaderElection

2017-05-15 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16011500#comment-16011500
 ] 

Onur Karaman commented on KAFKA-5175:
-

Thanks [~ijuma]. I've been staring at this and couldn't figure it out yet.

> Transient failure: 
> ControllerIntegrationTest.testPreferredReplicaLeaderElection
> ---
>
> Key: KAFKA-5175
> URL: https://issues.apache.org/jira/browse/KAFKA-5175
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Onur Karaman
>
> {code}
> java.lang.AssertionError: failed to get expected partition state upon broker 
> startup
>   at kafka.utils.TestUtils$.fail(TestUtils.scala:311)
>   at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:811)
>   at 
> kafka.controller.ControllerIntegrationTest.waitForPartitionState(ControllerIntegrationTest.scala:293)
>   at 
> kafka.controller.ControllerIntegrationTest.testPreferredReplicaLeaderElection(ControllerIntegrationTest.scala:211)
> {code}
> https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3497/testReport/kafka.controller/ControllerIntegrationTest/testPreferredReplicaLeaderElection/



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3356) Remove ConsumerOffsetChecker, deprecated in 0.9, in 0.11

2017-05-15 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16010823#comment-16010823
 ] 

Onur Karaman commented on KAFKA-3356:
-

I agree with [~jeffwidman].

> Remove ConsumerOffsetChecker, deprecated in 0.9, in 0.11
> 
>
> Key: KAFKA-3356
> URL: https://issues.apache.org/jira/browse/KAFKA-3356
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.10.2.0
>Reporter: Ashish Singh
>Assignee: Mickael Maison
>Priority: Blocker
> Fix For: 0.11.0.0
>
>
> ConsumerOffsetChecker is marked deprecated as of 0.9, should be removed in 
> 0.11.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5180) Transient failure: ControllerIntegrationTest.testControllerMoveIncrementsControllerEpoch

2017-05-12 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman updated KAFKA-5180:

Status: Patch Available  (was: In Progress)

> Transient failure: 
> ControllerIntegrationTest.testControllerMoveIncrementsControllerEpoch
> 
>
> Key: KAFKA-5180
> URL: https://issues.apache.org/jira/browse/KAFKA-5180
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Onur Karaman
>
> {code}
> Stacktrace org.I0Itec.zkclient.exception.ZkNoNodeException: 
> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = 
> NoNode for /controller_epoch  at 
> org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
> at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:1001)   
>   at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1100)at 
> org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1095)at 
> kafka.utils.ZkUtils.readData(ZkUtils.scala:652)  at 
> kafka.controller.ControllerIntegrationTest.$anonfun$testControllerMoveIncrementsControllerEpoch$2(ControllerIntegrationTest.scala:68)
> at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:808)at 
> kafka.controller.ControllerIntegrationTest.testControllerMoveIncrementsControllerEpoch(ControllerIntegrationTest.scala:68)
> {code}
> cc [~onurkaraman]
> https://builds.apache.org/blue/organizations/jenkins/kafka-trunk-jdk8/detail/kafka-trunk-jdk8/1487/tests



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Work started] (KAFKA-5180) Transient failure: ControllerIntegrationTest.testControllerMoveIncrementsControllerEpoch

2017-05-12 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-5180 started by Onur Karaman.
---
> Transient failure: 
> ControllerIntegrationTest.testControllerMoveIncrementsControllerEpoch
> 
>
> Key: KAFKA-5180
> URL: https://issues.apache.org/jira/browse/KAFKA-5180
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Onur Karaman
>
> {code}
> Stacktrace org.I0Itec.zkclient.exception.ZkNoNodeException: 
> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = 
> NoNode for /controller_epoch  at 
> org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
> at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:1001)   
>   at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1100)at 
> org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1095)at 
> kafka.utils.ZkUtils.readData(ZkUtils.scala:652)  at 
> kafka.controller.ControllerIntegrationTest.$anonfun$testControllerMoveIncrementsControllerEpoch$2(ControllerIntegrationTest.scala:68)
> at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:808)at 
> kafka.controller.ControllerIntegrationTest.testControllerMoveIncrementsControllerEpoch(ControllerIntegrationTest.scala:68)
> {code}
> cc [~onurkaraman]
> https://builds.apache.org/blue/organizations/jenkins/kafka-trunk-jdk8/detail/kafka-trunk-jdk8/1487/tests



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (KAFKA-5120) Several controller metrics block if controller lock is held by another thread

2017-05-11 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman resolved KAFKA-5120.
-
Resolution: Fixed

KAFKA-5028 has been checked in so this should no longer be an issue.

> Several controller metrics block if controller lock is held by another thread
> -
>
> Key: KAFKA-5120
> URL: https://issues.apache.org/jira/browse/KAFKA-5120
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, metrics
>Affects Versions: 0.10.2.0
>Reporter: Tim Carey-Smith
>Priority: Minor
>
> We have been tracking latency issues surrounding queries to Controller 
> MBeans. Upon digging into the root causes, we discovered that several metrics 
> acquire the controller lock within the gauge. 
> The affected metrics are: 
> * {{ActiveControllerCount}}
> * {{OfflinePartitionsCount}}
> * {{PreferredReplicaImbalanceCount}}
> If the controller is currently holding the lock and a MBean request is 
> received, the thread executing the request will block until the controller 
> releases the lock. 
> We discovered this in a cluster where the controller was holding the lock for 
> extended periods of time for normal operations. We have documented this issue 
> in KAFKA-5116. 
> Several possible solutions exist: 
> * Remove the lock from inside these {{Gauge}} s. 
> * Store and update the metric values in {{AtomicLong}} s. 
> Modifying the {{ActiveControllerCount}} metric seems to be straight-forward 
> while the other 2 metrics seem to be more involved. 
> We're happy to contribute a patch, but wanted to discuss potential solutions 
> and their tradeoffs before proceeding. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (KAFKA-3107) Error when trying to shut down auto balancing scheduler of controller

2017-05-11 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman resolved KAFKA-3107.
-
Resolution: Fixed

This problem should no longer exist after KAFKA-5028.

> Error when trying to shut down auto balancing scheduler of controller
> -
>
> Key: KAFKA-3107
> URL: https://issues.apache.org/jira/browse/KAFKA-3107
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1
>Reporter: Flavio Junqueira
>
> We observed the following exception when a controller was shutting down:
> {noformat}
> [run] Error handling event ZkEvent[New session event sent to 
> kafka.controller.KafkaController$SessionExpirationListener@3278c211]
> java.lang.IllegalStateException: Kafka scheduler has not been started
> at kafka.utils.KafkaScheduler.ensureStarted(KafkaScheduler.scala:114)
> at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:86)
> at 
> kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:350)
> at 
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply$mcZ$sp(KafkaController.scala:1108)
> at 
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1107)
> at 
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1107)
> at kafka.utils.Utils$.inLock(Utils.scala:535)
> at 
> kafka.controller.KafkaController$SessionExpirationListener.handleNewSession(KafkaController.scala:1107)
> at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {noformat}
> The scheduler should have been started.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5197) add a tool analyzing zookeeper client performance across its various apis

2017-05-08 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5197?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman updated KAFKA-5197:

Status: Patch Available  (was: In Progress)

> add a tool analyzing zookeeper client performance across its various apis
> -
>
> Key: KAFKA-5197
> URL: https://issues.apache.org/jira/browse/KAFKA-5197
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> The raw zookeeper client offers various means of getting and setting znodes. 
> It would be useful to have a tool that lets you analyze the performance of 
> these apis.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Work started] (KAFKA-5197) add a tool analyzing zookeeper client performance across its various apis

2017-05-08 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5197?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-5197 started by Onur Karaman.
---
> add a tool analyzing zookeeper client performance across its various apis
> -
>
> Key: KAFKA-5197
> URL: https://issues.apache.org/jira/browse/KAFKA-5197
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> The raw zookeeper client offers various means of getting and setting znodes. 
> It would be useful to have a tool that lets you analyze the performance of 
> these apis.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-5197) add a tool analyzing zookeeper client performance across its various apis

2017-05-08 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-5197:
---

 Summary: add a tool analyzing zookeeper client performance across 
its various apis
 Key: KAFKA-5197
 URL: https://issues.apache.org/jira/browse/KAFKA-5197
 Project: Kafka
  Issue Type: Sub-task
Reporter: Onur Karaman
Assignee: Onur Karaman


The raw zookeeper client offers various means of getting and setting znodes. It 
would be useful to have a tool that lets you analyze the performance of these 
apis.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5099) Replica Deletion Regression from KIP-101

2017-05-07 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16000261#comment-16000261
 ] 

Onur Karaman commented on KAFKA-5099:
-

By the way, I reran the file descriptor experiment against both my patch as 
well as against git hash b611cfa5c0f4941a491781424afd9b699bdb894e (the commit 
before KIP-101: 0baea2ac13532981f3fea11e5dfc6da5aafaeaa8) on both mac and 
linux. The file descriptors stick around while the index files have been 
removed for over 10 minutes so I gave up waiting.

> Replica Deletion Regression from KIP-101
> 
>
> Key: KAFKA-5099
> URL: https://issues.apache.org/jira/browse/KAFKA-5099
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>Priority: Blocker
> Fix For: 0.11.0.0
>
>
> It appears that replica deletion regressed from KIP-101. Replica deletion 
> happens when a broker receives a StopReplicaRequest with delete=true. Ever 
> since KAFKA-1911, replica deletion has been async, meaning the broker 
> responds with a StopReplicaResponse simply after marking the replica 
> directory as staged for deletion. This marking happens by moving a data log 
> directory and its contents such as /tmp/kafka-logs1/t1-0 to a marked 
> directory like /tmp/kafka-logs1/t1-0.8c9c4c0c61c44cc59ebeb00075a2a07f-delete, 
> acting as a soft-delete. A scheduled thread later actually deletes the data. 
> It appears that the regression occurs while the scheduled thread is actually 
> trying to delete the data, which means the controller considers operations 
> such as partition reassignment and topic deletion complete. But if you look 
> at the log4j logs and data logs, you'll find that the soft-deleted data logs 
> haven't actually won't get deleted. It seems that restarting the broker 
> actually allows for the soft-deleted directories to get deleted.
> Here's the setup:
> {code}
> > ./bin/zookeeper-server-start.sh config/zookeeper.properties
> > export LOG_DIR=logs0 && ./bin/kafka-server-start.sh 
> > config/server0.properties
> > export LOG_DIR=logs1 && ./bin/kafka-server-start.sh 
> > config/server1.properties
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t0 
> > --replica-assignment 1:0
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t1 
> > --replica-assignment 1:0
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic t0
> > cat p.txt
> {"partitions":
>  [
>   {"topic": "t1", "partition": 0, "replicas": [0] }
>  ],
> "version":1
> }
> > ./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 
> > --reassignment-json-file p.txt --execute
> {code}
> Here are sample logs:
> {code}
> [2017-04-20 17:46:54,801] INFO [ReplicaFetcherManager on broker 1] Removed 
> fetcher for partitions t0-0 (kafka.server.ReplicaFetcherManager)
> [2017-04-20 17:46:54,814] INFO Log for partition t0-0 is renamed to 
> /tmp/kafka-logs1/t0-0.bbc8fa126e3e4ff787f6b68d158ab771-delete and is 
> scheduled for deletion (kafka.log.LogManager)
> [2017-04-20 17:47:27,585] INFO Deleting index 
> /tmp/kafka-logs1/t0-0.bbc8fa126e3e4ff787f6b68d158ab771-delete/.index
>  (kafka.log.OffsetIndex)
> [2017-04-20 17:47:27,586] INFO Deleting index 
> /tmp/kafka-logs1/t0-0/.timeindex (kafka.log.TimeIndex)
> [2017-04-20 17:47:27,587] ERROR Exception in deleting 
> Log(/tmp/kafka-logs1/t0-0.bbc8fa126e3e4ff787f6b68d158ab771-delete). Moving it 
> to the end of the queue. (kafka.log.LogManager)
> java.io.FileNotFoundException: 
> /tmp/kafka-logs1/t0-0/leader-epoch-checkpoint.tmp (No such file or directory)
>   at java.io.FileOutputStream.open0(Native Method)
>   at java.io.FileOutputStream.open(FileOutputStream.java:270)
>   at java.io.FileOutputStream.(FileOutputStream.java:213)
>   at java.io.FileOutputStream.(FileOutputStream.java:162)
>   at kafka.server.checkpoints.CheckpointFile.write(CheckpointFile.scala:41)
>   at 
> kafka.server.checkpoints.LeaderEpochCheckpointFile.write(LeaderEpochCheckpointFile.scala:61)
>   at 
> kafka.server.epoch.LeaderEpochFileCache.kafka$server$epoch$LeaderEpochFileCache$$flush(LeaderEpochFileCache.scala:178)
>   at 
> kafka.server.epoch.LeaderEpochFileCache$$anonfun$clear$1.apply$mcV$sp(LeaderEpochFileCache.scala:161)
>   at 
> kafka.server.epoch.LeaderEpochFileCache$$anonfun$clear$1.apply(LeaderEpochFileCache.scala:159)
>   at 
> kafka.server.epoch.LeaderEpochFileCache$$anonfun$clear$1.apply(LeaderEpochFileCache.scala:159)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
>   at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:221)
>   at 
> kafka.server.epoch.LeaderEpochFileCache.clear(LeaderEpochFileCache.scala:159)
>   at kafka.log.Log.delete(Log.scala:1051)
>   at 
> 

[jira] [Commented] (KAFKA-5099) Replica Deletion Regression from KIP-101

2017-05-05 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15999200#comment-15999200
 ] 

Onur Karaman commented on KAFKA-5099:
-

Hi [~junrao]. Thanks for figuring it out. I gave your solution a shot and it 
does seem to delete the data now. However, I noticed something which I'm not 
sure is normal. While the files get deleted successfully now, it looks like the 
process holds onto the file descriptors corresponding to the index and 
timeindex files in their soft-delete directories for a few minutes. Is this 
somehow related to them being mmap'd?

So there's a window of time where I see:
{code}
> l /tmp/kafka-logs*/t*
ls: /tmp/kafka-logs*/t*: No such file or directory
> lsof -a -p 42214 | grep "/tmp/kafka-logs"
java42214 okaraman  txt  REG1,4 0 86940741 
/private/tmp/kafka-logs0/t0-0.f87568e7c6db40c7a898719376a79f8b-delete/.index
java42214 okaraman  txt  REG1,4 0 86940744 
/private/tmp/kafka-logs0/t0-0.f87568e7c6db40c7a898719376a79f8b-delete/.timeindex
java42214 okaraman  123u REG1,4 0 86940706 
/private/tmp/kafka-logs0/.lock
{code}

The file descriptor goes away after a few minutes:
{code}
> l /tmp/kafka-logs*/t*
ls: /tmp/kafka-logs*/t*: No such file or directory
> lsof -a -p 42214 | grep "/tmp/kafka-logs"
java42214 okaraman  123u REG1,4 0 86940706 
/private/tmp/kafka-logs0/.lock
{code}

> Replica Deletion Regression from KIP-101
> 
>
> Key: KAFKA-5099
> URL: https://issues.apache.org/jira/browse/KAFKA-5099
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>Priority: Blocker
> Fix For: 0.11.0.0
>
>
> It appears that replica deletion regressed from KIP-101. Replica deletion 
> happens when a broker receives a StopReplicaRequest with delete=true. Ever 
> since KAFKA-1911, replica deletion has been async, meaning the broker 
> responds with a StopReplicaResponse simply after marking the replica 
> directory as staged for deletion. This marking happens by moving a data log 
> directory and its contents such as /tmp/kafka-logs1/t1-0 to a marked 
> directory like /tmp/kafka-logs1/t1-0.8c9c4c0c61c44cc59ebeb00075a2a07f-delete, 
> acting as a soft-delete. A scheduled thread later actually deletes the data. 
> It appears that the regression occurs while the scheduled thread is actually 
> trying to delete the data, which means the controller considers operations 
> such as partition reassignment and topic deletion complete. But if you look 
> at the log4j logs and data logs, you'll find that the soft-deleted data logs 
> haven't actually won't get deleted. It seems that restarting the broker 
> actually allows for the soft-deleted directories to get deleted.
> Here's the setup:
> {code}
> > ./bin/zookeeper-server-start.sh config/zookeeper.properties
> > export LOG_DIR=logs0 && ./bin/kafka-server-start.sh 
> > config/server0.properties
> > export LOG_DIR=logs1 && ./bin/kafka-server-start.sh 
> > config/server1.properties
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t0 
> > --replica-assignment 1:0
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t1 
> > --replica-assignment 1:0
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic t0
> > cat p.txt
> {"partitions":
>  [
>   {"topic": "t1", "partition": 0, "replicas": [0] }
>  ],
> "version":1
> }
> > ./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 
> > --reassignment-json-file p.txt --execute
> {code}
> Here are sample logs:
> {code}
> [2017-04-20 17:46:54,801] INFO [ReplicaFetcherManager on broker 1] Removed 
> fetcher for partitions t0-0 (kafka.server.ReplicaFetcherManager)
> [2017-04-20 17:46:54,814] INFO Log for partition t0-0 is renamed to 
> /tmp/kafka-logs1/t0-0.bbc8fa126e3e4ff787f6b68d158ab771-delete and is 
> scheduled for deletion (kafka.log.LogManager)
> [2017-04-20 17:47:27,585] INFO Deleting index 
> /tmp/kafka-logs1/t0-0.bbc8fa126e3e4ff787f6b68d158ab771-delete/.index
>  (kafka.log.OffsetIndex)
> [2017-04-20 17:47:27,586] INFO Deleting index 
> /tmp/kafka-logs1/t0-0/.timeindex (kafka.log.TimeIndex)
> [2017-04-20 17:47:27,587] ERROR Exception in deleting 
> Log(/tmp/kafka-logs1/t0-0.bbc8fa126e3e4ff787f6b68d158ab771-delete). Moving it 
> to the end of the queue. (kafka.log.LogManager)
> java.io.FileNotFoundException: 
> /tmp/kafka-logs1/t0-0/leader-epoch-checkpoint.tmp (No such file or directory)
>   at java.io.FileOutputStream.open0(Native Method)
>   at java.io.FileOutputStream.open(FileOutputStream.java:270)
>   at java.io.FileOutputStream.(FileOutputStream.java:213)
>   at 

[jira] [Assigned] (KAFKA-5180) Transient failure: ControllerIntegrationTest.testControllerMoveIncrementsControllerEpoch

2017-05-05 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman reassigned KAFKA-5180:
---

Assignee: Onur Karaman

> Transient failure: 
> ControllerIntegrationTest.testControllerMoveIncrementsControllerEpoch
> 
>
> Key: KAFKA-5180
> URL: https://issues.apache.org/jira/browse/KAFKA-5180
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Onur Karaman
>
> {code}
> Stacktrace org.I0Itec.zkclient.exception.ZkNoNodeException: 
> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = 
> NoNode for /controller_epoch  at 
> org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
> at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:1001)   
>   at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1100)at 
> org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1095)at 
> kafka.utils.ZkUtils.readData(ZkUtils.scala:652)  at 
> kafka.controller.ControllerIntegrationTest.$anonfun$testControllerMoveIncrementsControllerEpoch$2(ControllerIntegrationTest.scala:68)
> at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:808)at 
> kafka.controller.ControllerIntegrationTest.testControllerMoveIncrementsControllerEpoch(ControllerIntegrationTest.scala:68)
> {code}
> cc [~onurkaraman]
> https://builds.apache.org/blue/organizations/jenkins/kafka-trunk-jdk8/detail/kafka-trunk-jdk8/1487/tests



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-5175) Transient failure: ControllerIntegrationTest.testPreferredReplicaLeaderElection

2017-05-04 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman reassigned KAFKA-5175:
---

Assignee: Onur Karaman

> Transient failure: 
> ControllerIntegrationTest.testPreferredReplicaLeaderElection
> ---
>
> Key: KAFKA-5175
> URL: https://issues.apache.org/jira/browse/KAFKA-5175
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Onur Karaman
>
> {code}
> java.lang.AssertionError: failed to get expected partition state upon broker 
> startup
>   at kafka.utils.TestUtils$.fail(TestUtils.scala:311)
>   at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:811)
>   at 
> kafka.controller.ControllerIntegrationTest.waitForPartitionState(ControllerIntegrationTest.scala:293)
>   at 
> kafka.controller.ControllerIntegrationTest.testPreferredReplicaLeaderElection(ControllerIntegrationTest.scala:211)
> {code}
> https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3497/testReport/kafka.controller/ControllerIntegrationTest/testPreferredReplicaLeaderElection/



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5107) remove preferred replica election state from ControllerContext

2017-04-27 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman updated KAFKA-5107:

Status: Patch Available  (was: In Progress)

> remove preferred replica election state from ControllerContext
> --
>
> Key: KAFKA-5107
> URL: https://issues.apache.org/jira/browse/KAFKA-5107
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> KAFKA-5028 moves the controller to a single-threaded model, so we would no 
> longer have work interleaved between preferred replica leader election, 
> meaning we don't need to keep its state.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Work started] (KAFKA-5107) remove preferred replica election state from ControllerContext

2017-04-27 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-5107 started by Onur Karaman.
---
> remove preferred replica election state from ControllerContext
> --
>
> Key: KAFKA-5107
> URL: https://issues.apache.org/jira/browse/KAFKA-5107
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> KAFKA-5028 moves the controller to a single-threaded model, so we would no 
> longer have work interleaved between preferred replica leader election, 
> meaning we don't need to keep its state.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5028) convert kafka controller to a single-threaded event queue model

2017-04-27 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15987101#comment-15987101
 ] 

Onur Karaman commented on KAFKA-5028:
-

Shortly after check-in, I noticed a bug: the patch has 
onPreferredReplicaElection keep the line calling 
topicDeletionManager.markTopicIneligibleForDeletion but removes the line 
calling topicDeletionManager.resumeDeletionForTopics. I will address this in 
KAFKA-5107

> convert kafka controller to a single-threaded event queue model
> ---
>
> Key: KAFKA-5028
> URL: https://issues.apache.org/jira/browse/KAFKA-5028
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
> Fix For: 0.11.0.0
>
>
> The goal of this ticket is to improve controller maintainability by 
> simplifying the controller's concurrency semantics. The controller code has a 
> lot of shared state between several threads using several concurrency 
> primitives. This makes the code hard to reason about.
> This ticket proposes we convert the controller to a single-threaded event 
> queue model. We add a new controller thread which processes events held in an 
> event queue. Note that this does not mean we get rid of all threads used by 
> the controller. We merely delegate all work that interacts with controller 
> local state to this single thread. With only a single thread accessing and 
> modifying the controller local state, we no longer need to worry about 
> concurrent access, which means we can get rid of the various concurrency 
> primitives used throughout the controller.
> Performance is expected to match existing behavior since the bulk of the 
> existing controller work today already happens sequentially in the ZkClient’s 
> single ZkEventThread.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5120) Several controller metrics block if controller lock is held by another thread

2017-04-24 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15982118#comment-15982118
 ] 

Onur Karaman commented on KAFKA-5120:
-

This will be fixed in KAFKA-5028. It chooses the latter approach you mentioned 
where the metrics would just read an atomic variable holding the precomputed 
metric values.

> Several controller metrics block if controller lock is held by another thread
> -
>
> Key: KAFKA-5120
> URL: https://issues.apache.org/jira/browse/KAFKA-5120
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, metrics
>Affects Versions: 0.10.2.0
>Reporter: Tim Carey-Smith
>Priority: Minor
>
> We have been tracking latency issues surrounding queries to Controller 
> MBeans. Upon digging into the root causes, we discovered that several metrics 
> acquire the controller lock within the gauge. 
> The affected metrics are: 
> * {{ActiveControllerCount}}
> * {{OfflinePartitionsCount}}
> * {{PreferredReplicaImbalanceCount}}
> If the controller is currently holding the lock and a MBean request is 
> received, the thread executing the request will block until the controller 
> releases the lock. 
> We discovered this in a cluster where the controller was holding the lock for 
> extended periods of time for normal operations. We have documented this issue 
> in KAFKA-5116. 
> Several possible solutions exist: 
> * Remove the lock from inside these {{Gauge}}s. 
> * Store and update the metric values in {{AtomicLong}}s. 
> Modifying the {{ActiveControllerCount}} metric seems to be straight-forward 
> while the other 2 metrics seem to be more involved. 
> We're happy to contribute a patch, but wanted to discuss potential solutions 
> and their tradeoffs before proceeding. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5107) remove preferred replica election state from ControllerContext

2017-04-21 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman updated KAFKA-5107:

Summary: remove preferred replica election state from ControllerContext  
(was: investigate removing preferred replica leader election state from 
ControllerContext)

> remove preferred replica election state from ControllerContext
> --
>
> Key: KAFKA-5107
> URL: https://issues.apache.org/jira/browse/KAFKA-5107
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> KAFKA-5028 moves the controller to a single-threaded model, so we would no 
> longer have work interleaved between preferred replica leader election, 
> meaning we don't need to keep its state.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-5107) investigate removing preferred replica leader election state from ControllerContext

2017-04-21 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-5107:
---

 Summary: investigate removing preferred replica leader election 
state from ControllerContext
 Key: KAFKA-5107
 URL: https://issues.apache.org/jira/browse/KAFKA-5107
 Project: Kafka
  Issue Type: Sub-task
Reporter: Onur Karaman
Assignee: Onur Karaman


KAFKA-5028 moves the controller to a single-threaded model, so we would no 
longer have work interleaved between preferred replica leader election, meaning 
we don't need to keep its state.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5099) Replica Deletion Regression from KIP-101

2017-04-20 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977902#comment-15977902
 ] 

Onur Karaman commented on KAFKA-5099:
-

My initial guess is that the new epoch checkpoint file isn't being moved to the 
soft-deleted directory correctly.

> Replica Deletion Regression from KIP-101
> 
>
> Key: KAFKA-5099
> URL: https://issues.apache.org/jira/browse/KAFKA-5099
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> It appears that replica deletion regressed from KIP-101. Replica deletion 
> happens when a broker receives a StopReplicaRequest with delete=true. Ever 
> since KAFKA-1911, replica deletion has been async, meaning the broker 
> responds with a StopReplicaResponse simply after marking the replica 
> directory as staged for deletion. This marking happens by moving a data log 
> directory and its contents such as /tmp/kafka-logs1/t1-0 to a marked 
> directory like /tmp/kafka-logs1/t1-0.8c9c4c0c61c44cc59ebeb00075a2a07f-delete, 
> acting as a soft-delete. A scheduled thread later actually deletes the data. 
> It appears that the regression occurs while the scheduled thread is actually 
> trying to delete the data, which means the controller considers operations 
> such as partition reassignment and topic deletion complete. But if you look 
> at the log4j logs and data logs, you'll find that the soft-deleted data logs 
> haven't actually won't get deleted. It seems that restarting the broker 
> actually allows for the soft-deleted directories to get deleted.
> Here's the setup:
> {code}
> > ./bin/zookeeper-server-start.sh config/zookeeper.properties
> > export LOG_DIR=logs0 && ./bin/kafka-server-start.sh 
> > config/server0.properties
> > export LOG_DIR=logs1 && ./bin/kafka-server-start.sh 
> > config/server1.properties
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t0 
> > --replica-assignment 1:0
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t1 
> > --replica-assignment 1:0
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic t0
> > cat p.txt
> {"partitions":
>  [
>   {"topic": "t1", "partition": 0, "replicas": [0] }
>  ],
> "version":1
> }
> > ./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 
> > --reassignment-json-file p.txt --execute
> {code}
> Here are sample logs:
> {code}
> [2017-04-20 17:46:54,801] INFO [ReplicaFetcherManager on broker 1] Removed 
> fetcher for partitions t0-0 (kafka.server.ReplicaFetcherManager)
> [2017-04-20 17:46:54,814] INFO Log for partition t0-0 is renamed to 
> /tmp/kafka-logs1/t0-0.bbc8fa126e3e4ff787f6b68d158ab771-delete and is 
> scheduled for deletion (kafka.log.LogManager)
> [2017-04-20 17:47:27,585] INFO Deleting index 
> /tmp/kafka-logs1/t0-0.bbc8fa126e3e4ff787f6b68d158ab771-delete/.index
>  (kafka.log.OffsetIndex)
> [2017-04-20 17:47:27,586] INFO Deleting index 
> /tmp/kafka-logs1/t0-0/.timeindex (kafka.log.TimeIndex)
> [2017-04-20 17:47:27,587] ERROR Exception in deleting 
> Log(/tmp/kafka-logs1/t0-0.bbc8fa126e3e4ff787f6b68d158ab771-delete). Moving it 
> to the end of the queue. (kafka.log.LogManager)
> java.io.FileNotFoundException: 
> /tmp/kafka-logs1/t0-0/leader-epoch-checkpoint.tmp (No such file or directory)
>   at java.io.FileOutputStream.open0(Native Method)
>   at java.io.FileOutputStream.open(FileOutputStream.java:270)
>   at java.io.FileOutputStream.(FileOutputStream.java:213)
>   at java.io.FileOutputStream.(FileOutputStream.java:162)
>   at kafka.server.checkpoints.CheckpointFile.write(CheckpointFile.scala:41)
>   at 
> kafka.server.checkpoints.LeaderEpochCheckpointFile.write(LeaderEpochCheckpointFile.scala:61)
>   at 
> kafka.server.epoch.LeaderEpochFileCache.kafka$server$epoch$LeaderEpochFileCache$$flush(LeaderEpochFileCache.scala:178)
>   at 
> kafka.server.epoch.LeaderEpochFileCache$$anonfun$clear$1.apply$mcV$sp(LeaderEpochFileCache.scala:161)
>   at 
> kafka.server.epoch.LeaderEpochFileCache$$anonfun$clear$1.apply(LeaderEpochFileCache.scala:159)
>   at 
> kafka.server.epoch.LeaderEpochFileCache$$anonfun$clear$1.apply(LeaderEpochFileCache.scala:159)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
>   at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:221)
>   at 
> kafka.server.epoch.LeaderEpochFileCache.clear(LeaderEpochFileCache.scala:159)
>   at kafka.log.Log.delete(Log.scala:1051)
>   at 
> kafka.log.LogManager.kafka$log$LogManager$$deleteLogs(LogManager.scala:442)
>   at 
> kafka.log.LogManager$$anonfun$startup$5.apply$mcV$sp(LogManager.scala:241)
>   at 
> kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:57)
>   at 

[jira] [Created] (KAFKA-5099) Replica Deletion Regression from KIP-101

2017-04-20 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-5099:
---

 Summary: Replica Deletion Regression from KIP-101
 Key: KAFKA-5099
 URL: https://issues.apache.org/jira/browse/KAFKA-5099
 Project: Kafka
  Issue Type: Bug
Reporter: Onur Karaman
Assignee: Onur Karaman


It appears that replica deletion regressed from KIP-101. Replica deletion 
happens when a broker receives a StopReplicaRequest with delete=true. Ever 
since KAFKA-1911, replica deletion has been async, meaning the broker responds 
with a StopReplicaResponse simply after marking the replica directory as staged 
for deletion. This marking happens by moving a data log directory and its 
contents such as /tmp/kafka-logs1/t1-0 to a marked directory like 
/tmp/kafka-logs1/t1-0.8c9c4c0c61c44cc59ebeb00075a2a07f-delete, acting as a 
soft-delete. A scheduled thread later actually deletes the data. It appears 
that the regression occurs while the scheduled thread is actually trying to 
delete the data, which means the controller considers operations such as 
partition reassignment and topic deletion complete. But if you look at the 
log4j logs and data logs, you'll find that the soft-deleted data logs haven't 
actually won't get deleted. It seems that restarting the broker actually allows 
for the soft-deleted directories to get deleted.

Here's the setup:
{code}
> ./bin/zookeeper-server-start.sh config/zookeeper.properties
> export LOG_DIR=logs0 && ./bin/kafka-server-start.sh config/server0.properties
> export LOG_DIR=logs1 && ./bin/kafka-server-start.sh config/server1.properties
> ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t0 
> --replica-assignment 1:0
> ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t1 
> --replica-assignment 1:0
> ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic t0
> cat p.txt
{"partitions":
 [
  {"topic": "t1", "partition": 0, "replicas": [0] }
 ],
"version":1
}
> ./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 
> --reassignment-json-file p.txt --execute
{code}
Here are sample logs:
{code}
[2017-04-20 17:46:54,801] INFO [ReplicaFetcherManager on broker 1] Removed 
fetcher for partitions t0-0 (kafka.server.ReplicaFetcherManager)
[2017-04-20 17:46:54,814] INFO Log for partition t0-0 is renamed to 
/tmp/kafka-logs1/t0-0.bbc8fa126e3e4ff787f6b68d158ab771-delete and is scheduled 
for deletion (kafka.log.LogManager)
[2017-04-20 17:47:27,585] INFO Deleting index 
/tmp/kafka-logs1/t0-0.bbc8fa126e3e4ff787f6b68d158ab771-delete/.index
 (kafka.log.OffsetIndex)
[2017-04-20 17:47:27,586] INFO Deleting index 
/tmp/kafka-logs1/t0-0/.timeindex (kafka.log.TimeIndex)
[2017-04-20 17:47:27,587] ERROR Exception in deleting 
Log(/tmp/kafka-logs1/t0-0.bbc8fa126e3e4ff787f6b68d158ab771-delete). Moving it 
to the end of the queue. (kafka.log.LogManager)
java.io.FileNotFoundException: 
/tmp/kafka-logs1/t0-0/leader-epoch-checkpoint.tmp (No such file or directory)
  at java.io.FileOutputStream.open0(Native Method)
  at java.io.FileOutputStream.open(FileOutputStream.java:270)
  at java.io.FileOutputStream.(FileOutputStream.java:213)
  at java.io.FileOutputStream.(FileOutputStream.java:162)
  at kafka.server.checkpoints.CheckpointFile.write(CheckpointFile.scala:41)
  at 
kafka.server.checkpoints.LeaderEpochCheckpointFile.write(LeaderEpochCheckpointFile.scala:61)
  at 
kafka.server.epoch.LeaderEpochFileCache.kafka$server$epoch$LeaderEpochFileCache$$flush(LeaderEpochFileCache.scala:178)
  at 
kafka.server.epoch.LeaderEpochFileCache$$anonfun$clear$1.apply$mcV$sp(LeaderEpochFileCache.scala:161)
  at 
kafka.server.epoch.LeaderEpochFileCache$$anonfun$clear$1.apply(LeaderEpochFileCache.scala:159)
  at 
kafka.server.epoch.LeaderEpochFileCache$$anonfun$clear$1.apply(LeaderEpochFileCache.scala:159)
  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
  at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:221)
  at 
kafka.server.epoch.LeaderEpochFileCache.clear(LeaderEpochFileCache.scala:159)
  at kafka.log.Log.delete(Log.scala:1051)
  at kafka.log.LogManager.kafka$log$LogManager$$deleteLogs(LogManager.scala:442)
  at kafka.log.LogManager$$anonfun$startup$5.apply$mcV$sp(LogManager.scala:241)
  at 
kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)
  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:57)
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
  at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
  at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at 

[jira] [Created] (KAFKA-5083) always leave the last surviving member of the ISR in ZK

2017-04-18 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-5083:
---

 Summary: always leave the last surviving member of the ISR in ZK
 Key: KAFKA-5083
 URL: https://issues.apache.org/jira/browse/KAFKA-5083
 Project: Kafka
  Issue Type: Sub-task
Reporter: Onur Karaman
Assignee: Onur Karaman


Currently we erase ISR membership if the replica to be removed from the ISR is 
the last surviving member of the ISR and unclean leader election is enabled for 
the corresponding topic.

We should investigate leaving the last replica in ISR in ZK, independent of 
whether unclean leader election is enabled or not. That way, if people 
re-disabled unclean leader election, we can still try to elect the leader from 
the last in-sync replica.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5069) add controller integration tests

2017-04-14 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman updated KAFKA-5069:

Status: Patch Available  (was: In Progress)

> add controller integration tests
> 
>
> Key: KAFKA-5069
> URL: https://issues.apache.org/jira/browse/KAFKA-5069
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> Test the various controller protocols by observing zookeeper and broker state.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5028) convert kafka controller to a single-threaded event queue model

2017-04-14 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman updated KAFKA-5028:

Status: Patch Available  (was: In Progress)

> convert kafka controller to a single-threaded event queue model
> ---
>
> Key: KAFKA-5028
> URL: https://issues.apache.org/jira/browse/KAFKA-5028
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> The goal of this ticket is to improve controller maintainability by 
> simplifying the controller's concurrency semantics. The controller code has a 
> lot of shared state between several threads using several concurrency 
> primitives. This makes the code hard to reason about.
> This ticket proposes we convert the controller to a single-threaded event 
> queue model. We add a new controller thread which processes events held in an 
> event queue. Note that this does not mean we get rid of all threads used by 
> the controller. We merely delegate all work that interacts with controller 
> local state to this single thread. With only a single thread accessing and 
> modifying the controller local state, we no longer need to worry about 
> concurrent access, which means we can get rid of the various concurrency 
> primitives used throughout the controller.
> Performance is expected to match existing behavior since the bulk of the 
> existing controller work today already happens sequentially in the ZkClient’s 
> single ZkEventThread.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Work started] (KAFKA-5069) add controller integration tests

2017-04-13 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-5069 started by Onur Karaman.
---
> add controller integration tests
> 
>
> Key: KAFKA-5069
> URL: https://issues.apache.org/jira/browse/KAFKA-5069
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> Test the various controller protocols by observing zookeeper and broker state.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-5069) add controller integration tests

2017-04-13 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-5069:
---

 Summary: add controller integration tests
 Key: KAFKA-5069
 URL: https://issues.apache.org/jira/browse/KAFKA-5069
 Project: Kafka
  Issue Type: Sub-task
Reporter: Onur Karaman
Assignee: Onur Karaman


Test the various controller protocols by observing zookeeper and broker state.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Work started] (KAFKA-5028) convert kafka controller to a single-threaded event queue model

2017-04-05 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-5028 started by Onur Karaman.
---
> convert kafka controller to a single-threaded event queue model
> ---
>
> Key: KAFKA-5028
> URL: https://issues.apache.org/jira/browse/KAFKA-5028
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> The goal of this ticket is to improve controller maintainability by 
> simplifying the controller's concurrency semantics. The controller code has a 
> lot of shared state between several threads using several concurrency 
> primitives. This makes the code hard to reason about.
> This ticket proposes we convert the controller to a single-threaded event 
> queue model. We add a new controller thread which processes events held in an 
> event queue. Note that this does not mean we get rid of all threads used by 
> the controller. We merely delegate all work that interacts with controller 
> local state to this single thread. With only a single thread accessing and 
> modifying the controller local state, we no longer need to worry about 
> concurrent access, which means we can get rid of the various concurrency 
> primitives used throughout the controller.
> Performance is expected to match existing behavior since the bulk of the 
> existing controller work today already happens sequentially in the ZkClient’s 
> single ZkEventThread.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5028) convert kafka controller to a single-threaded event queue model

2017-04-05 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman updated KAFKA-5028:

Summary: convert kafka controller to a single-threaded event queue model  
(was: Convert Kafka Controller to a single-threaded event queue model)

> convert kafka controller to a single-threaded event queue model
> ---
>
> Key: KAFKA-5028
> URL: https://issues.apache.org/jira/browse/KAFKA-5028
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> The goal of this ticket is to improve controller maintainability by 
> simplifying the controller's concurrency semantics. The controller code has a 
> lot of shared state between several threads using several concurrency 
> primitives. This makes the code hard to reason about.
> This ticket proposes we convert the controller to a single-threaded event 
> queue model. We add a new controller thread which processes events held in an 
> event queue. Note that this does not mean we get rid of all threads used by 
> the controller. We merely delegate all work that interacts with controller 
> local state to this single thread. With only a single thread accessing and 
> modifying the controller local state, we no longer need to worry about 
> concurrent access, which means we can get rid of the various concurrency 
> primitives used throughout the controller.
> Performance is expected to match existing behavior since the bulk of the 
> existing controller work today already happens sequentially in the ZkClient’s 
> single ZkEventThread.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-5028) Convert Kafka Controller to a single-threaded event queue model

2017-04-05 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-5028:
---

 Summary: Convert Kafka Controller to a single-threaded event queue 
model
 Key: KAFKA-5028
 URL: https://issues.apache.org/jira/browse/KAFKA-5028
 Project: Kafka
  Issue Type: Sub-task
Reporter: Onur Karaman
Assignee: Onur Karaman


The goal of this ticket is to improve controller maintainability by simplifying 
the controller's concurrency semantics. The controller code has a lot of shared 
state between several threads using several concurrency primitives. This makes 
the code hard to reason about.

This ticket proposes we convert the controller to a single-threaded event queue 
model. We add a new controller thread which processes events held in an event 
queue. Note that this does not mean we get rid of all threads used by the 
controller. We merely delegate all work that interacts with controller local 
state to this single thread. With only a single thread accessing and modifying 
the controller local state, we no longer need to worry about concurrent access, 
which means we can get rid of the various concurrency primitives used 
throughout the controller.

Performance is expected to match existing behavior since the bulk of the 
existing controller work today already happens sequentially in the ZkClient’s 
single ZkEventThread.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-5027) Kafka Controller Redesign

2017-04-05 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-5027:
---

 Summary: Kafka Controller Redesign
 Key: KAFKA-5027
 URL: https://issues.apache.org/jira/browse/KAFKA-5027
 Project: Kafka
  Issue Type: Improvement
Reporter: Onur Karaman
Assignee: Onur Karaman


The goal of this redesign is to improve controller performance, controller 
maintainability, and cluster reliability.

Documentation regarding what's being considered can be found 
[here|https://docs.google.com/document/d/1rLDmzDOGQQeSiMANP0rC2RYp_L7nUGHzFD9MQISgXYM].



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4959) remove controller concurrent access to non-threadsafe NetworkClient, Selector, and SSLEngine

2017-03-27 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman updated KAFKA-4959:

Status: Patch Available  (was: In Progress)

> remove controller concurrent access to non-threadsafe NetworkClient, 
> Selector, and SSLEngine
> 
>
> Key: KAFKA-4959
> URL: https://issues.apache.org/jira/browse/KAFKA-4959
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> This brought down a cluster by causing continuous controller moves.
> ZkClient's ZkEventThread and a RequestSendThread can concurrently use objects 
> that aren't thread-safe:
> * Selector
> * NetworkClient
> * SSLEngine (this was the big one for us. We turn on SSL for interbroker 
> communication).
> As per the "Concurrency Notes" section from the [SSLEngine 
> javadoc|https://docs.oracle.com/javase/7/docs/api/javax/net/ssl/SSLEngine.html]:
> bq. two threads must not attempt to call the same method (either wrap() or 
> unwrap()) concurrently
> SSLEngine.wrap gets called in:
> * SslTransportLayer.write
> * SslTransportLayer.handshake
> * SslTransportLayer.close
> It turns out that the ZkEventThread and RequestSendThread can concurrently 
> call SSLEngine.wrap:
> * ZkEventThread calls SslTransportLayer.close from 
> ControllerChannelManager.removeExistingBroker
> * RequestSendThread can call SslTransportLayer.write or 
> SslTransportLayer.handshake from NetworkClient.poll
> Suppose the controller moves for whatever reason. The former controller could 
> have had a RequestSendThread who was in the middle of sending out messages to 
> the cluster while the ZkEventThread began executing 
> KafkaController.onControllerResignation, which calls 
> ControllerChannelManager.shutdown, which sequentially cleans up the 
> controller-to-broker queue and connection for every broker in the cluster. 
> This cleanup includes the call to 
> ControllerChannelManager.removeExistingBroker as mentioned earlier, causing 
> the concurrent call to SSLEngine.wrap. This concurrent call throws a 
> BufferOverflowException which ControllerChannelManager.removeExistingBroker 
> catches so the ControllerChannelManager.shutdown moves onto cleaning up the 
> next controller-to-broker queue and connection, skipping the cleanup steps 
> such as clearing the queue, stopping the RequestSendThread, and removing the 
> entry from its brokerStateInfo map.
> By failing out of the Selector.close, the sensors corresponding to the broker 
> connection has not been cleaned up. Any later attempt at initializing an 
> identical Selector will result in a sensor collision and therefore cause 
> Selector initialization to throw an exception. In other words, any later 
> attempts by this broker to become controller again will fail on 
> initialization. When controller initialization fails, the controller deletes 
> the /controller znode and lets another broker take over.
> Now suppose the controller moves enough times such that every broker hits the 
> BufferOverflowException concurrency issue. We're now guaranteed to fail 
> controller initialization due to the sensor collision on every controller 
> transition, so the controller will move across brokers continuously.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Work started] (KAFKA-4959) remove controller concurrent access to non-threadsafe NetworkClient, Selector, and SSLEngine

2017-03-27 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-4959 started by Onur Karaman.
---
> remove controller concurrent access to non-threadsafe NetworkClient, 
> Selector, and SSLEngine
> 
>
> Key: KAFKA-4959
> URL: https://issues.apache.org/jira/browse/KAFKA-4959
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> This brought down a cluster by causing continuous controller moves.
> ZkClient's ZkEventThread and a RequestSendThread can concurrently use objects 
> that aren't thread-safe:
> * Selector
> * NetworkClient
> * SSLEngine (this was the big one for us. We turn on SSL for interbroker 
> communication).
> As per the "Concurrency Notes" section from the [SSLEngine 
> javadoc|https://docs.oracle.com/javase/7/docs/api/javax/net/ssl/SSLEngine.html]:
> bq. two threads must not attempt to call the same method (either wrap() or 
> unwrap()) concurrently
> SSLEngine.wrap gets called in:
> * SslTransportLayer.write
> * SslTransportLayer.handshake
> * SslTransportLayer.close
> It turns out that the ZkEventThread and RequestSendThread can concurrently 
> call SSLEngine.wrap:
> * ZkEventThread calls SslTransportLayer.close from 
> ControllerChannelManager.removeExistingBroker
> * RequestSendThread can call SslTransportLayer.write or 
> SslTransportLayer.handshake from NetworkClient.poll
> Suppose the controller moves for whatever reason. The former controller could 
> have had a RequestSendThread who was in the middle of sending out messages to 
> the cluster while the ZkEventThread began executing 
> KafkaController.onControllerResignation, which calls 
> ControllerChannelManager.shutdown, which sequentially cleans up the 
> controller-to-broker queue and connection for every broker in the cluster. 
> This cleanup includes the call to 
> ControllerChannelManager.removeExistingBroker as mentioned earlier, causing 
> the concurrent call to SSLEngine.wrap. This concurrent call throws a 
> BufferOverflowException which ControllerChannelManager.removeExistingBroker 
> catches so the ControllerChannelManager.shutdown moves onto cleaning up the 
> next controller-to-broker queue and connection, skipping the cleanup steps 
> such as clearing the queue, stopping the RequestSendThread, and removing the 
> entry from its brokerStateInfo map.
> By failing out of the Selector.close, the sensors corresponding to the broker 
> connection has not been cleaned up. Any later attempt at initializing an 
> identical Selector will result in a sensor collision and therefore cause 
> Selector initialization to throw an exception. In other words, any later 
> attempts by this broker to become controller again will fail on 
> initialization. When controller initialization fails, the controller deletes 
> the /controller znode and lets another broker take over.
> Now suppose the controller moves enough times such that every broker hits the 
> BufferOverflowException concurrency issue. We're now guaranteed to fail 
> controller initialization due to the sensor collision on every controller 
> transition, so the controller will move across brokers continuously.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4959) remove controller concurrent access to non-threadsafe NetworkClient, Selector, and SSLEngine

2017-03-27 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-4959:
---

 Summary: remove controller concurrent access to non-threadsafe 
NetworkClient, Selector, and SSLEngine
 Key: KAFKA-4959
 URL: https://issues.apache.org/jira/browse/KAFKA-4959
 Project: Kafka
  Issue Type: Bug
Reporter: Onur Karaman
Assignee: Onur Karaman


This brought down a cluster by causing continuous controller moves.

ZkClient's ZkEventThread and a RequestSendThread can concurrently use objects 
that aren't thread-safe:
* Selector
* NetworkClient
* SSLEngine (this was the big one for us. We turn on SSL for interbroker 
communication).

As per the "Concurrency Notes" section from the [SSLEngine 
javadoc|https://docs.oracle.com/javase/7/docs/api/javax/net/ssl/SSLEngine.html]:
bq. two threads must not attempt to call the same method (either wrap() or 
unwrap()) concurrently

SSLEngine.wrap gets called in:
* SslTransportLayer.write
* SslTransportLayer.handshake
* SslTransportLayer.close

It turns out that the ZkEventThread and RequestSendThread can concurrently call 
SSLEngine.wrap:
* ZkEventThread calls SslTransportLayer.close from 
ControllerChannelManager.removeExistingBroker
* RequestSendThread can call SslTransportLayer.write or 
SslTransportLayer.handshake from NetworkClient.poll

Suppose the controller moves for whatever reason. The former controller could 
have had a RequestSendThread who was in the middle of sending out messages to 
the cluster while the ZkEventThread began executing 
KafkaController.onControllerResignation, which calls 
ControllerChannelManager.shutdown, which sequentially cleans up the 
controller-to-broker queue and connection for every broker in the cluster. This 
cleanup includes the call to ControllerChannelManager.removeExistingBroker as 
mentioned earlier, causing the concurrent call to SSLEngine.wrap. This 
concurrent call throws a BufferOverflowException which 
ControllerChannelManager.removeExistingBroker catches so the 
ControllerChannelManager.shutdown moves onto cleaning up the next 
controller-to-broker queue and connection, skipping the cleanup steps such as 
clearing the queue, stopping the RequestSendThread, and removing the entry from 
its brokerStateInfo map.

By failing out of the Selector.close, the sensors corresponding to the broker 
connection has not been cleaned up. Any later attempt at initializing an 
identical Selector will result in a sensor collision and therefore cause 
Selector initialization to throw an exception. In other words, any later 
attempts by this broker to become controller again will fail on initialization. 
When controller initialization fails, the controller deletes the /controller 
znode and lets another broker take over.

Now suppose the controller moves enough times such that every broker hits the 
BufferOverflowException concurrency issue. We're now guaranteed to fail 
controller initialization due to the sensor collision on every controller 
transition, so the controller will move across brokers continuously.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4900) Brokers stuck in controller re-election loop after failing to register metrics

2017-03-27 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943568#comment-15943568
 ] 

Onur Karaman commented on KAFKA-4900:
-

[~nickt] Do you think it makes sense for me to open a new ticket specific to 
the BufferOverflowException concurrency issue and link all related underlying 
issues to this ticket?

> Brokers stuck in controller re-election loop after failing to register metrics
> --
>
> Key: KAFKA-4900
> URL: https://issues.apache.org/jira/browse/KAFKA-4900
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, core
>Affects Versions: 0.10.1.1
>Reporter: Nick Travers
>
> We hit this today in one of out three node staging clusters. The exception 
> continues to occur on all three nodes.
> {code}
> 2017-03-15 02:17:30,677 ERROR 
> [ZkClient-EventThread-35-samsa-zkserver.stage.sjc1.square:26101/samsa] 
> server.ZookeeperLeaderElector - Error while electing or becoming leader on 
> broker 9
> java.lang.IllegalArgumentException: A metric named 'MetricName 
> [name=connection-close-rate, 
> group=controller-channel-metrics,description=Connections closed per second in 
> the window., tags={broker-id=10}]' already exists, can't register another one.
> at 
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:380)
> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:179)
> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:164)
> at 
> org.apache.kafka.common.network.Selector$SelectorMetrics.(Selector.java:617)
> at org.apache.kafka.common.network.Selector.(Selector.java:138)
> at 
> kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$addNewBroker(ControllerChannelManager.scala:101)
> at 
> kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:45)
> at 
> kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:45)
> at scala.collection.immutable.Set$Set3.foreach(Set.scala:163)
> at 
> kafka.controller.ControllerChannelManager.(ControllerChannelManager.scala:45)
> at 
> kafka.controller.KafkaController.startChannelManager(KafkaController.scala:814)
> at 
> kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:742)
> at 
> kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:334)
> at 
> kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:167)
> at 
> kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:84)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:146)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:141)
> at org.I0Itec.zkclient.ZkClient$9.run(ZkClient.java:824)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) 
> {code}
> We observe a tight loop of controller (re-)election, i.e. one node hits this 
> exception, and leadership transitions to the next, which then hits the 
> exception, ad infinitum.
> Producers and consumers appear to be connecting ok, and are able to produce 
> and consume messages.
> Relevant data points:
>  - prior to this cluster restart, a partition reassignment was attempted for 
> a number of topics, which appeared to get stuck in the "in progress" state 
> (on the order of days)
>  - these topics were subsequently deleted
>  - a rolling restart of the cluster was performed was to turn on 
> broker-to-broker SSL communication
>  - the SSL change has subsequently been _rolled back_ after we observed these 
> exceptions
>  - the entire cluster was shut down, and nodes brought back one at a time in 
> an attempt to clear the exception. We were able to restart the cluster, but 
> we continue to see the exceptions
> We also observed, during the same time as the exception above, the following 
> exception on all hosts:
> {code}
> 2017-03-15 01:44:04,572 ERROR 
> [ZkClient-EventThread-36-samsa-zkserver.stage.sjc1.square:26101/samsa] 
> controller.ReplicaStateMachine$BrokerChangeListener - [BrokerChangeListener 
> on Controller 10]: Error while handling broker changes
> java.lang.ClassCastException: java.lang.String cannot be cast to 
> 

[jira] [Commented] (KAFKA-4900) Brokers stuck in controller re-election loop after failing to register metrics

2017-03-24 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941296#comment-15941296
 ] 

Onur Karaman commented on KAFKA-4900:
-

I haven't figured out some of the small details but I think I figured out the 
gist of what happened at LinkedIn.

One problem is that ZkClient's ZkEventThread and a RequestSendThread can 
concurrently use objects that aren't thread-safe:
* Selector
* NetworkClient
* SSLEngine (this was the big one for us. We turn on SSL for interbroker 
communication).

As per the "Concurrency Notes" section from the [SSLEngine 
javadoc|https://docs.oracle.com/javase/7/docs/api/javax/net/ssl/SSLEngine.html]:
bq. two threads must not attempt to call the same method (either wrap() or 
unwrap()) concurrently

SSLEngine.wrap gets called in:
* SslTransportLayer.write
* SslTransportLayer.handshake
* SslTransportLayer.close

It turns out that the ZkEventThread and RequestSendThread can concurrently call 
SSLEngine.wrap:
* ZkEventThread calls SslTransportLayer.close from 
ControllerChannelManager.removeExistingBroker
* RequestSendThread can call SslTransportLayer.write or 
SslTransportLayer.handshake from NetworkClient.poll

Suppose the controller moves for whatever reason. The former controller could 
have had a RequestSendThread who was in the middle of sending out messages to 
the cluster while the ZkEventThread began executing 
KafkaController.onControllerResignation, which calls 
ControllerChannelManager.shutdown, which sequentially cleans up the 
controller-to-broker queue and connection for every broker in the cluster. This 
cleanup includes the call to ControllerChannelManager.removeExistingBroker as 
mentioned earlier, causing the concurrent call to SSLEngine.wrap. This 
concurrent call throws a BufferOverflowException which 
ControllerChannelManager.removeExistingBroker catches so the 
ControllerChannelManager.shutdown moves onto cleaning up the next 
controller-to-broker queue and connection, skipping the cleanup steps such as 
clearing the queue, stopping the RequestSendThread, and removing the entry from 
its brokerStateInfo map.

By failing out of the Selector.close, the sensors corresponding to the broker 
connection has not been cleaned up. Any later attempt at initializing an 
identical Selector will result in a sensor collision and therefore cause 
Selector initialization to throw an exception. In other words, any later 
attempts by this broker to become controller again will fail on initialization. 
When controller initialization fails, the controller deletes the /controller 
znode and lets another broker take over.

Now suppose the controller moves enough times such that every broker hits the 
BufferOverflowException concurrency issue. We're now guaranteed to fail 
controller initialization due to the sensor collision on every controller 
transition, so the controller will move across brokers continuously.

The "connection-close-rate" is specifically cited because that's always the 
first sensor registered upon Selector initialization and therefore is the 
collision we see in the logs.

Now the question is why did the controller move enough times to hit every 
broker in the cluster with the BufferOverflowException to begin with? 
Considering the BufferOverflowException on resignation issue alone, it's 
plausible that the cluster should stabilize after the first 
BufferOverflowException.

We were running 0.10.1.1, where a controller can process events from ZkClient 
even after resigning as controller (fixed in 0.10.2.0 in KAFKA-4447). For 
instance, processing an isr change notification after resignation causes a 
NullPointerException while broadcasting an UpdateMetadataRequest to the cluster 
since the earlier controller resignation made the ControllerChannelManager 
null. This NullPointerException causes the controller to delete the /controller 
znode even though it's no longer the controller, kicking out the current active 
controller, potentially causing the same scenario to repeat. Every broker's 
logs had reported this NullPointerException from the 
IsrChangeNotificationListener, which explains how the controller moved around 
enough times such that every broker experienced the initial 
BufferOverflowException.

> Brokers stuck in controller re-election loop after failing to register metrics
> --
>
> Key: KAFKA-4900
> URL: https://issues.apache.org/jira/browse/KAFKA-4900
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, core
>Affects Versions: 0.10.1.1
>Reporter: Nick Travers
>
> We hit this today in one of out three node staging clusters. The exception 
> continues to occur on all three nodes.
> {code}
> 2017-03-15 02:17:30,677 ERROR 
> [ZkClient-EventThread-35-samsa-zkserver.stage.sjc1.square:26101/samsa] 
> 

[jira] [Commented] (KAFKA-4900) Brokers stuck in controller re-election loop after failing to register metrics

2017-03-16 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929447#comment-15929447
 ] 

Onur Karaman commented on KAFKA-4900:
-

Okay yeah with bad znode data, I could see how the controller would bounce 
around until the znode was cleaned.

We didn't have a malformed znode so we're experiencing something else. I think 
I made some decent progress today. I'll make sure to report back what happened 
to us once it's all figured out.

> Brokers stuck in controller re-election loop after failing to register metrics
> --
>
> Key: KAFKA-4900
> URL: https://issues.apache.org/jira/browse/KAFKA-4900
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, core
>Affects Versions: 0.10.1.1
>Reporter: Nick Travers
>
> We hit this today in one of out three node staging clusters. The exception 
> continues to occur on all three nodes.
> {code}
> 2017-03-15 02:17:30,677 ERROR 
> [ZkClient-EventThread-35-samsa-zkserver.stage.sjc1.square:26101/samsa] 
> server.ZookeeperLeaderElector - Error while electing or becoming leader on 
> broker 9
> java.lang.IllegalArgumentException: A metric named 'MetricName 
> [name=connection-close-rate, 
> group=controller-channel-metrics,description=Connections closed per second in 
> the window., tags={broker-id=10}]' already exists, can't register another one.
> at 
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:380)
> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:179)
> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:164)
> at 
> org.apache.kafka.common.network.Selector$SelectorMetrics.(Selector.java:617)
> at org.apache.kafka.common.network.Selector.(Selector.java:138)
> at 
> kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$addNewBroker(ControllerChannelManager.scala:101)
> at 
> kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:45)
> at 
> kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:45)
> at scala.collection.immutable.Set$Set3.foreach(Set.scala:163)
> at 
> kafka.controller.ControllerChannelManager.(ControllerChannelManager.scala:45)
> at 
> kafka.controller.KafkaController.startChannelManager(KafkaController.scala:814)
> at 
> kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:742)
> at 
> kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:334)
> at 
> kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:167)
> at 
> kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:84)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:146)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:141)
> at org.I0Itec.zkclient.ZkClient$9.run(ZkClient.java:824)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) 
> {code}
> We observe a tight loop of controller (re-)election, i.e. one node hits this 
> exception, and leadership transitions to the next, which then hits the 
> exception, ad infinitum.
> Producers and consumers appear to be connecting ok, and are able to produce 
> and consume messages.
> Relevant data points:
>  - prior to this cluster restart, a partition reassignment was attempted for 
> a number of topics, which appeared to get stuck in the "in progress" state 
> (on the order of days)
>  - these topics were subsequently deleted
>  - a rolling restart of the cluster was performed was to turn on 
> broker-to-broker SSL communication
>  - the SSL change has subsequently been _rolled back_ after we observed these 
> exceptions
>  - the entire cluster was shut down, and nodes brought back one at a time in 
> an attempt to clear the exception. We were able to restart the cluster, but 
> we continue to see the exceptions
> We also observed, during the same time as the exception above, the following 
> exception on all hosts:
> {code}
> 2017-03-15 01:44:04,572 ERROR 
> [ZkClient-EventThread-36-samsa-zkserver.stage.sjc1.square:26101/samsa] 
> controller.ReplicaStateMachine$BrokerChangeListener - [BrokerChangeListener 
> on 

[jira] [Commented] (KAFKA-4893) async topic deletion conflicts with max topic length

2017-03-15 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927298#comment-15927298
 ] 

Onur Karaman commented on KAFKA-4893:
-

Also [~vahid] while I agree reducing the limit is by far the easiest, it 
definitely feels like we're moving the goalpost. We should instead try to 
maintain our guarantees as long as it's not exceedingly difficult. In this 
scenario, it doesn't feel exceedingly difficult.

> async topic deletion conflicts with max topic length
> 
>
> Key: KAFKA-4893
> URL: https://issues.apache.org/jira/browse/KAFKA-4893
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Vahid Hashemian
>Priority: Minor
>
> As per the 
> [documentation|http://kafka.apache.org/documentation/#basic_ops_add_topic], 
> topics can be only 249 characters long to line up with typical filesystem 
> limitations:
> {quote}
> Each sharded partition log is placed into its own folder under the Kafka log 
> directory. The name of such folders consists of the topic name, appended by a 
> dash (\-) and the partition id. Since a typical folder name can not be over 
> 255 characters long, there will be a limitation on the length of topic names. 
> We assume the number of partitions will not ever be above 100,000. Therefore, 
> topic names cannot be longer than 249 characters. This leaves just enough 
> room in the folder name for a dash and a potentially 5 digit long partition 
> id.
> {quote}
> {{kafka.common.Topic.maxNameLength}} is set to 249 and is used during 
> validation.
> This limit ends up not being quite right since topic deletion ends up 
> renaming the directory to the form {{topic-partition.uniqueId-delete}} as can 
> be seen in {{LogManager.asyncDelete}}:
> {code}
> val dirName = new StringBuilder(removedLog.name)
>   .append(".")
>   
> .append(java.util.UUID.randomUUID.toString.replaceAll("-",""))
>   .append(Log.DeleteDirSuffix)
>   .toString()
> {code}
> So the unique id and "-delete" suffix end up hogging some of the characters. 
> Deleting a long-named topic results in a log message such as the following:
> {code}
> kafka.common.KafkaStorageException: Failed to rename log directory from 
> /tmp/kafka-logs0/0-0
>  to 
> /tmp/kafka-logs0/0-0.797bba3fb2464729840f87769243edbb-delete
>   at kafka.log.LogManager.asyncDelete(LogManager.scala:439)
>   at 
> kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:142)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
>   at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:221)
>   at kafka.cluster.Partition.delete(Partition.scala:137)
>   at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:230)
>   at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:260)
>   at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:259)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:259)
>   at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:174)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:86)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:64)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> The topic after this point still exists but has Leader set to -1 and the 
> controller recognizes the topic completion as incomplete (the topic znode is 
> still in /admin/delete_topics).
> I don't believe linkedin has any topic name this long but I'm making the 
> ticket in case anyone runs into this problem.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4900) Brokers stuck in controller re-election loop after failing to register metrics

2017-03-15 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927200#comment-15927200
 ] 

Onur Karaman commented on KAFKA-4900:
-

I hadn't yet investigated why some of the fetchers hadn't started up, but it 
makes me wonder if we should change the controller-broker interaction such that 
either:
# the expected leadership/followership is periodically heartbeated from 
controller to broker
# the current leadership/followership is periodically heartbeated from broker 
to controller

> Brokers stuck in controller re-election loop after failing to register metrics
> --
>
> Key: KAFKA-4900
> URL: https://issues.apache.org/jira/browse/KAFKA-4900
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, core
>Affects Versions: 0.10.1.1
>Reporter: Nick Travers
>
> We hit this today in one of out three node staging clusters. The exception 
> continues to occur on all three nodes.
> {code}
> 2017-03-15 02:17:30,677 ERROR 
> [ZkClient-EventThread-35-samsa-zkserver.stage.sjc1.square:26101/samsa] 
> server.ZookeeperLeaderElector - Error while electing or becoming leader on 
> broker 9
> java.lang.IllegalArgumentException: A metric named 'MetricName 
> [name=connection-close-rate, 
> group=controller-channel-metrics,description=Connections closed per second in 
> the window., tags={broker-id=10}]' already exists, can't register another one.
> at 
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:380)
> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:179)
> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:164)
> at 
> org.apache.kafka.common.network.Selector$SelectorMetrics.(Selector.java:617)
> at org.apache.kafka.common.network.Selector.(Selector.java:138)
> at 
> kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$addNewBroker(ControllerChannelManager.scala:101)
> at 
> kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:45)
> at 
> kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:45)
> at scala.collection.immutable.Set$Set3.foreach(Set.scala:163)
> at 
> kafka.controller.ControllerChannelManager.(ControllerChannelManager.scala:45)
> at 
> kafka.controller.KafkaController.startChannelManager(KafkaController.scala:814)
> at 
> kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:742)
> at 
> kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:334)
> at 
> kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:167)
> at 
> kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:84)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:146)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:141)
> at org.I0Itec.zkclient.ZkClient$9.run(ZkClient.java:824)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) 
> {code}
> We observe a tight loop of controller (re-)election, i.e. one node hits this 
> exception, and leadership transitions to the next, which then hits the 
> exception, ad infinitum.
> Producers and consumers appear to be connecting ok, and are able to produce 
> and consume messages.
> Relevant data points:
>  - prior to this cluster restart, a partition reassignment was attempted for 
> a number of topics, which appeared to get stuck in the "in progress" state 
> (on the order of days)
>  - these topics were subsequently deleted
>  - a rolling restart of the cluster was performed was to turn on 
> broker-to-broker SSL communication
>  - the SSL change has subsequently been _rolled back_ after we observed these 
> exceptions
>  - the entire cluster was shut down, and nodes brought back one at a time in 
> an attempt to clear the exception. We were able to restart the cluster, but 
> we continue to see the exceptions
> We also observed, during the same time as the exception above, the following 
> exception on all hosts:
> {code}
> 2017-03-15 01:44:04,572 ERROR 
> [ZkClient-EventThread-36-samsa-zkserver.stage.sjc1.square:26101/samsa] 
> 

[jira] [Comment Edited] (KAFKA-4893) async topic deletion conflicts with max topic length

2017-03-15 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927155#comment-15927155
 ] 

Onur Karaman edited comment on KAFKA-4893 at 3/15/17 11:23 PM:
---

[~ijuma] I had thought about the additional fsync on the directory as well 
after posting the earlier comment.

Would the following structure help?
{code}
kafkaLogDir
| -- cleaner-offset-checkpoint
| -- delete
|| -- 8fc621a18fe746d1b8eb4a7fa55a04bc
| | -- .index
| | -- .log
| | -- .timeindex
| | -- t-0
| -- meta.properties
| -- recovery-point-offset-checkpoint
| -- replication-offset-checkpoint
{code}

We'd put all the partitions staged for async deletion under kafkaLogDir/delete. 
Each directory corresponding to a partition staged for async deletion would 
have a new empty file with filename topic-partition so you can figure out later 
on what partition the data belonged to.

The kafkaLogDir/delete directory would just be a trash bin directory that 
always exists, and we'd just delete all of the data inside the trash bin.


was (Author: onurkaraman):
[~ijuma] I had thought about the additional fsync on the directory as well 
after posting the earlier comment.

Would the following structure help?
{code}
kafkaLogDir
| -- cleaner-offset-checkpoint
| -- delete
|| -- 8fc621a18fe746d1b8eb4a7fa55a04bc
| | -- .index
| | -- .log
| | -- .timeindex
| | -- t-0
| -- meta.properties
| -- recovery-point-offset-checkpoint
| -- replication-offset-checkpoint
{code}

The kafkaLogDir/delete directory would always be there. We'd put all the 
partitions staged for async deletion under kafkaLogDir/delete. Each directory 
corresponding to a partition staged for async deletion would have a new empty 
file with filename topic-partition so you can figure out later on what 
partition the data belonged to.

> async topic deletion conflicts with max topic length
> 
>
> Key: KAFKA-4893
> URL: https://issues.apache.org/jira/browse/KAFKA-4893
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Vahid Hashemian
>Priority: Minor
>
> As per the 
> [documentation|http://kafka.apache.org/documentation/#basic_ops_add_topic], 
> topics can be only 249 characters long to line up with typical filesystem 
> limitations:
> {quote}
> Each sharded partition log is placed into its own folder under the Kafka log 
> directory. The name of such folders consists of the topic name, appended by a 
> dash (\-) and the partition id. Since a typical folder name can not be over 
> 255 characters long, there will be a limitation on the length of topic names. 
> We assume the number of partitions will not ever be above 100,000. Therefore, 
> topic names cannot be longer than 249 characters. This leaves just enough 
> room in the folder name for a dash and a potentially 5 digit long partition 
> id.
> {quote}
> {{kafka.common.Topic.maxNameLength}} is set to 249 and is used during 
> validation.
> This limit ends up not being quite right since topic deletion ends up 
> renaming the directory to the form {{topic-partition.uniqueId-delete}} as can 
> be seen in {{LogManager.asyncDelete}}:
> {code}
> val dirName = new StringBuilder(removedLog.name)
>   .append(".")
>   
> .append(java.util.UUID.randomUUID.toString.replaceAll("-",""))
>   .append(Log.DeleteDirSuffix)
>   .toString()
> {code}
> So the unique id and "-delete" suffix end up hogging some of the characters. 
> Deleting a long-named topic results in a log message such as the following:
> {code}
> kafka.common.KafkaStorageException: Failed to rename log directory from 
> /tmp/kafka-logs0/0-0
>  to 
> /tmp/kafka-logs0/0-0.797bba3fb2464729840f87769243edbb-delete
>   at kafka.log.LogManager.asyncDelete(LogManager.scala:439)
>   at 
> kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:142)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
>   at 

[jira] [Commented] (KAFKA-4893) async topic deletion conflicts with max topic length

2017-03-15 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15927155#comment-15927155
 ] 

Onur Karaman commented on KAFKA-4893:
-

[~ijuma] I had thought about the additional fsync on the directory as well 
after posting the earlier comment.

Would the following structure help?
{code}
kafkaLogDir
| -- cleaner-offset-checkpoint
| -- delete
|| -- 8fc621a18fe746d1b8eb4a7fa55a04bc
| | -- .index
| | -- .log
| | -- .timeindex
| | -- t-0
| -- meta.properties
| -- recovery-point-offset-checkpoint
| -- replication-offset-checkpoint
{code}

The kafkaLogDir/delete directory would always be there. We'd put all the 
partitions staged for async deletion under kafkaLogDir/delete. Each directory 
corresponding to a partition staged for async deletion would have a new empty 
file with filename topic-partition so you can figure out later on what 
partition the data belonged to.

> async topic deletion conflicts with max topic length
> 
>
> Key: KAFKA-4893
> URL: https://issues.apache.org/jira/browse/KAFKA-4893
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Vahid Hashemian
>Priority: Minor
>
> As per the 
> [documentation|http://kafka.apache.org/documentation/#basic_ops_add_topic], 
> topics can be only 249 characters long to line up with typical filesystem 
> limitations:
> {quote}
> Each sharded partition log is placed into its own folder under the Kafka log 
> directory. The name of such folders consists of the topic name, appended by a 
> dash (\-) and the partition id. Since a typical folder name can not be over 
> 255 characters long, there will be a limitation on the length of topic names. 
> We assume the number of partitions will not ever be above 100,000. Therefore, 
> topic names cannot be longer than 249 characters. This leaves just enough 
> room in the folder name for a dash and a potentially 5 digit long partition 
> id.
> {quote}
> {{kafka.common.Topic.maxNameLength}} is set to 249 and is used during 
> validation.
> This limit ends up not being quite right since topic deletion ends up 
> renaming the directory to the form {{topic-partition.uniqueId-delete}} as can 
> be seen in {{LogManager.asyncDelete}}:
> {code}
> val dirName = new StringBuilder(removedLog.name)
>   .append(".")
>   
> .append(java.util.UUID.randomUUID.toString.replaceAll("-",""))
>   .append(Log.DeleteDirSuffix)
>   .toString()
> {code}
> So the unique id and "-delete" suffix end up hogging some of the characters. 
> Deleting a long-named topic results in a log message such as the following:
> {code}
> kafka.common.KafkaStorageException: Failed to rename log directory from 
> /tmp/kafka-logs0/0-0
>  to 
> /tmp/kafka-logs0/0-0.797bba3fb2464729840f87769243edbb-delete
>   at kafka.log.LogManager.asyncDelete(LogManager.scala:439)
>   at 
> kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:142)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
>   at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:221)
>   at kafka.cluster.Partition.delete(Partition.scala:137)
>   at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:230)
>   at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:260)
>   at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:259)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:259)
>   at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:174)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:86)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:64)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> The topic after this point still 

[jira] [Comment Edited] (KAFKA-4900) Brokers stuck in controller re-election loop after failing to register metrics

2017-03-15 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925611#comment-15925611
 ] 

Onur Karaman edited comment on KAFKA-4900 at 3/15/17 6:44 AM:
--

For reference, I think the controller was changing every 3 seconds or so during 
the re-election loop.

We were able to stabilize the cluster after about two hours but haven't yet 
figured out the root cause.

We didn't know the cause, so we definitely didn't take the optimal sequence of 
steps to stabilize the cluster. Anyways, here's what we did:
# We figured deleting the /controller znode wouldn't make a difference since it 
was anyways changing every 3 seconds.
# We initially tried bouncing what we had incorrectly thought was the sole 
broker corresponding to the metric collision, but this didn't help. The 
re-election loop continued, and we later found out in the logs that there had 
been "connection-close-rate" metrics collisions tagged with other brokers.
# We then shifted traffic out of the bad cluster, killed all of the brokers, 
and restarted them all. Getting the cluster back up and recovering its 
unflushed segments took around 25 minutes.
# While the re-election loop had stopped, we still saw under-replicated 
partitions after the cluster was back up and later found that some of the 
fetchers haven't been started. I think at this point we just deleted the 
/controller znode to elect a new controller with the hopes of it broadcasting 
leadership/followership of partitions to the cluster so that the broker's 
fetchers can start up correctly. After this, the under-replicated partitions 
came back down to approximately zero. We think the few under-replicated 
partitions here and there are from uneven partition leadership distribution but 
this wasn't a big deal so we haven't attempted to fix the few remaining 
under-replicated partitions yet.


was (Author: onurkaraman):
For reference, I think the controller was changing every 3 seconds or so during 
the re-election loop.

We were able to stabilize the cluster after about two hours but haven't yet 
figured out the root cause.

We didn't know the cause, so we definitely didn't take the optimal sequence of 
steps to stabilize the cluster. Anyways, here's what we did:
# We figured deleting the /controller znode wouldn't make a difference since it 
was anyways changing every 3 seconds.
# We initially tried bouncing what we had incorrectly thought was the sole 
broker corresponding to the metric collision, but this didn't help. The 
re-election loop continued, and we later found out in the logs that there had 
been "connection-close-rate" metrics collisions tagged with other brokers.
# We then shifted traffic out of the bad cluster, killed all of the brokers, 
and restarted them all. Getting the cluster back up and recovering its 
unflushed segments took around 25 minutes.
# We still saw under-replicated partitions after the cluster was back up and 
later found that some of the fetchers haven't been started. I think at this 
point we just deleted the /controller znode to elect a new controller with the 
hopes of it broadcasting leadership/followership of partitions to the cluster 
so that the broker's fetchers can start up correctly. After this, the 
under-replicated partitions came back down to approximately zero. We think the 
few under-replicated partitions here and there are from uneven partition 
leadership distribution but this wasn't a big deal so we haven't attempted to 
fix the few remaining under-replicated partitions yet.

> Brokers stuck in controller re-election loop after failing to register metrics
> --
>
> Key: KAFKA-4900
> URL: https://issues.apache.org/jira/browse/KAFKA-4900
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, core
>Affects Versions: 0.10.1.1
>Reporter: Nick Travers
>
> We hit this today in one of out three node staging clusters. The exception 
> continues to occur on all three nodes.
> {code}
> 2017-03-15 02:17:30,677 ERROR 
> [ZkClient-EventThread-35-samsa-zkserver.stage.sjc1.square:26101/samsa] 
> server.ZookeeperLeaderElector - Error while electing or becoming leader on 
> broker 9
> java.lang.IllegalArgumentException: A metric named 'MetricName 
> [name=connection-close-rate, 
> group=controller-channel-metrics,description=Connections closed per second in 
> the window., tags={broker-id=10}]' already exists, can't register another one.
> at 
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:380)
> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:179)
> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:164)
> at 
> org.apache.kafka.common.network.Selector$SelectorMetrics.(Selector.java:617)
> at 

[jira] [Commented] (KAFKA-4900) Brokers stuck in controller re-election loop after failing to register metrics

2017-03-15 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925611#comment-15925611
 ] 

Onur Karaman commented on KAFKA-4900:
-

For reference, I think the controller was changing every 3 seconds or so during 
the re-election loop.

We were able to stabilize the cluster after about two hours but haven't yet 
figured out the root cause.

We didn't know the cause, so we definitely didn't take the optimal sequence of 
steps to stabilize the cluster. Anyways, here's what we did:
# We figured deleting the /controller znode wouldn't make a difference since it 
was anyways changing every 3 seconds.
# We initially tried bouncing what we had incorrectly thought was the sole 
broker corresponding to the metric collision, but this didn't help. The 
re-election loop continued, and we later found out in the logs that there had 
been "connection-close-rate" metrics collisions tagged with other brokers.
# We then shifted traffic out of the bad cluster, killed all of the brokers, 
and restarted them all. Getting the cluster back up and recovering its 
unflushed segments took around 25 minutes.
# We still saw under-replicated partitions after the cluster was back up and 
later found that some of the fetchers haven't been started. I think at this 
point we just deleted the /controller znode to elect a new controller with the 
hopes of it broadcasting leadership/followership of partitions to the cluster 
so that the broker's fetchers can start up correctly. After this, the 
under-replicated partitions came back down to approximately zero. We think the 
few under-replicated partitions here and there are from uneven partition 
leadership distribution but this wasn't a big deal so we haven't attempted to 
fix the few remaining under-replicated partitions yet.

> Brokers stuck in controller re-election loop after failing to register metrics
> --
>
> Key: KAFKA-4900
> URL: https://issues.apache.org/jira/browse/KAFKA-4900
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, core
>Affects Versions: 0.10.1.1
>Reporter: Nick Travers
>
> We hit this today in one of out three node staging clusters. The exception 
> continues to occur on all three nodes.
> {code}
> 2017-03-15 02:17:30,677 ERROR 
> [ZkClient-EventThread-35-samsa-zkserver.stage.sjc1.square:26101/samsa] 
> server.ZookeeperLeaderElector - Error while electing or becoming leader on 
> broker 9
> java.lang.IllegalArgumentException: A metric named 'MetricName 
> [name=connection-close-rate, 
> group=controller-channel-metrics,description=Connections closed per second in 
> the window., tags={broker-id=10}]' already exists, can't register another one.
> at 
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:380)
> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:179)
> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:164)
> at 
> org.apache.kafka.common.network.Selector$SelectorMetrics.(Selector.java:617)
> at org.apache.kafka.common.network.Selector.(Selector.java:138)
> at 
> kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$addNewBroker(ControllerChannelManager.scala:101)
> at 
> kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:45)
> at 
> kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:45)
> at scala.collection.immutable.Set$Set3.foreach(Set.scala:163)
> at 
> kafka.controller.ControllerChannelManager.(ControllerChannelManager.scala:45)
> at 
> kafka.controller.KafkaController.startChannelManager(KafkaController.scala:814)
> at 
> kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:742)
> at 
> kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:334)
> at 
> kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:167)
> at 
> kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:84)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:146)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:141)
> at 

[jira] [Commented] (KAFKA-4900) Brokers stuck in controller re-election loop after failing to register metrics

2017-03-14 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15925503#comment-15925503
 ] 

Onur Karaman commented on KAFKA-4900:
-

We hit the same re-election loop issue from ZookeeperLeaderElector throwing 
java.lang.IllegalArgumentException due to attempting to register 
"connection-close-rate" in production at linkedin today.

A similar issue of duplicate metric registration came up recently in KAFKA-4811 
as well. Strange.

I don't think we saw the ClassCastException.

> Brokers stuck in controller re-election loop after failing to register metrics
> --
>
> Key: KAFKA-4900
> URL: https://issues.apache.org/jira/browse/KAFKA-4900
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, core
>Affects Versions: 0.10.1.1
>Reporter: Nick Travers
>
> We hit this today in one of out three node staging clusters. The exception 
> continues to occur on all three nodes.
> {code}
> 2017-03-15 02:17:30,677 ERROR 
> [ZkClient-EventThread-35-samsa-zkserver.stage.sjc1.square:26101/samsa] 
> server.ZookeeperLeaderElector - Error while electing or becoming leader on 
> broker 9
> java.lang.IllegalArgumentException: A metric named 'MetricName 
> [name=connection-close-rate, 
> group=controller-channel-metrics,description=Connections closed per second in 
> the window., tags={broker-id=10}]' already exists, can't register another one.
> at 
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:380)
> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:179)
> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:164)
> at 
> org.apache.kafka.common.network.Selector$SelectorMetrics.(Selector.java:617)
> at org.apache.kafka.common.network.Selector.(Selector.java:138)
> at 
> kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$addNewBroker(ControllerChannelManager.scala:101)
> at 
> kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:45)
> at 
> kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:45)
> at scala.collection.immutable.Set$Set3.foreach(Set.scala:163)
> at 
> kafka.controller.ControllerChannelManager.(ControllerChannelManager.scala:45)
> at 
> kafka.controller.KafkaController.startChannelManager(KafkaController.scala:814)
> at 
> kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:742)
> at 
> kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:334)
> at 
> kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:167)
> at 
> kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:84)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:146)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:141)
> at org.I0Itec.zkclient.ZkClient$9.run(ZkClient.java:824)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) 
> {code}
> We observe a tight loop of controller (re-)election, i.e. one nodes hits this 
> exception, and leadership transitions to the next, which then hits the 
> exception, ad infinitum.
> Producers and consumers appear to be connecting ok, and are able to produce 
> and consume messages.
> Relevant data points:
>  - prior to this cluster restart, a repartition of a number of topics was 
> attempted, which appeared to get stuck in the "in progress" state (on the 
> order of days)
>  - these topics were subsequently deleted
>  - a rolling restart of the cluster was performed was to turn on 
> broker-to-broker SSL communication
>  - the SSL change has subsequently been _rolled back_ after we observed these 
> exceptions
>  - the entire cluster was shut down, and nodes brought back one at a time in 
> an attempt to clear the exception. We were able to restart the cluster, but 
> we continue to see the exceptions
> We also observed, during the same time as the exception above, the following 
> exception on all hosts:
> {code}
> 2017-03-15 01:44:04,572 ERROR 
> [ZkClient-EventThread-36-samsa-zkserver.stage.sjc1.square:26101/samsa] 
> controller.ReplicaStateMachine$BrokerChangeListener - 

[jira] [Commented] (KAFKA-4896) Offset loading can use more threads

2017-03-14 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15924764#comment-15924764
 ] 

Onur Karaman commented on KAFKA-4896:
-

[~junrao] Naively bumping the number of scheduler threads might cause a 
concurrency bug if GroupCoordinator.handleGroupImmigration and 
GroupCoordinator.handleGroupEmigration are called concurrently.

handleGroupImmigration schedules a task (doLoadGroupsAndOffsets) that acquires 
the partitionLock, updates the loadingPartitions, releases the partitionLock, 
loads the metadata into memory, acquires the partitionLock, and updates 
ownedPartitions and loadingPartitions, and then releases the partitionLock.

handleGroupEmigration schedules a task (removeGroupsAndOffsets) that acquires 
the partitionLock, updates ownedPartitions, clears the metadata from memory, 
and then releases the partitionLock.

Adding more threads to the scheduler risks removeGroupsAndOffsets concurrently 
clearing metadata while doLoadGroupsAndOffsets is loading metadata by executing 
between doLoadGroupsAndOffsets's two windows of holding the partitionLock.

> Offset loading can use more threads
> ---
>
> Key: KAFKA-4896
> URL: https://issues.apache.org/jira/browse/KAFKA-4896
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.10.2.0
>Reporter: Jun Rao
>  Labels: newbie
>
> Currently, in GroupMetadataManager, we have a single thread for loading the 
> offset cache. We could speed it up with more threads.
>  /* single-thread scheduler to handle offset/group metadata cache loading and 
> unloading */
>   private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = 
> "group-metadata-manager-")



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4893) async topic deletion conflicts with max topic length

2017-03-14 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15923636#comment-15923636
 ] 

Onur Karaman commented on KAFKA-4893:
-

Maybe the right move is to just add more layers to the log directory like so:
{code}
kafkaLogDir/delete/topic-partition/uniqueId/
{code}
and put all the data under that directory.

I think typical filesystems have a directory limit of 4096 characters so we 
should be good to go. The one annoyance is that the old structure is already 
out in the wild, so you may need to recognize and handle both delete directory 
structures for some time.

> async topic deletion conflicts with max topic length
> 
>
> Key: KAFKA-4893
> URL: https://issues.apache.org/jira/browse/KAFKA-4893
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Vahid Hashemian
>Priority: Minor
>
> As per the 
> [documentation|http://kafka.apache.org/documentation/#basic_ops_add_topic], 
> topics can be only 249 characters long to line up with typical filesystem 
> limitations:
> {quote}
> Each sharded partition log is placed into its own folder under the Kafka log 
> directory. The name of such folders consists of the topic name, appended by a 
> dash (\-) and the partition id. Since a typical folder name can not be over 
> 255 characters long, there will be a limitation on the length of topic names. 
> We assume the number of partitions will not ever be above 100,000. Therefore, 
> topic names cannot be longer than 249 characters. This leaves just enough 
> room in the folder name for a dash and a potentially 5 digit long partition 
> id.
> {quote}
> {{kafka.common.Topic.maxNameLength}} is set to 249 and is used during 
> validation.
> This limit ends up not being quite right since topic deletion ends up 
> renaming the directory to the form {{topic-partition.uniqueId-delete}} as can 
> be seen in {{LogManager.asyncDelete}}:
> {code}
> val dirName = new StringBuilder(removedLog.name)
>   .append(".")
>   
> .append(java.util.UUID.randomUUID.toString.replaceAll("-",""))
>   .append(Log.DeleteDirSuffix)
>   .toString()
> {code}
> So the unique id and "-delete" suffix end up hogging some of the characters. 
> Deleting a long-named topic results in a log message such as the following:
> {code}
> kafka.common.KafkaStorageException: Failed to rename log directory from 
> /tmp/kafka-logs0/0-0
>  to 
> /tmp/kafka-logs0/0-0.797bba3fb2464729840f87769243edbb-delete
>   at kafka.log.LogManager.asyncDelete(LogManager.scala:439)
>   at 
> kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:142)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
>   at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:221)
>   at kafka.cluster.Partition.delete(Partition.scala:137)
>   at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:230)
>   at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:260)
>   at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:259)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:259)
>   at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:174)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:86)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:64)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> The topic after this point still exists but has Leader set to -1 and the 
> controller recognizes the topic completion as incomplete (the topic znode is 
> still in /admin/delete_topics).
> I don't believe linkedin has any topic name this long but I'm making the 
> ticket in case anyone runs into this problem.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4893) async topic deletion conflicts with max topic length

2017-03-13 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-4893:
---

 Summary: async topic deletion conflicts with max topic length
 Key: KAFKA-4893
 URL: https://issues.apache.org/jira/browse/KAFKA-4893
 Project: Kafka
  Issue Type: Bug
Reporter: Onur Karaman
Priority: Minor


As per the 
[documentation|http://kafka.apache.org/documentation/#basic_ops_add_topic], 
topics can be only 249 characters long to line up with typical filesystem 
limitations:
{quote}
Each sharded partition log is placed into its own folder under the Kafka log 
directory. The name of such folders consists of the topic name, appended by a 
dash (\-) and the partition id. Since a typical folder name can not be over 255 
characters long, there will be a limitation on the length of topic names. We 
assume the number of partitions will not ever be above 100,000. Therefore, 
topic names cannot be longer than 249 characters. This leaves just enough room 
in the folder name for a dash and a potentially 5 digit long partition id.
{quote}

{{kafka.common.Topic.maxNameLength}} is set to 249 and is used during 
validation.

This limit ends up not being quite right since topic deletion ends up renaming 
the directory to the form {{topic-partition.uniqueId-delete}} as can be seen in 
{{LogManager.asyncDelete}}:
{code}
val dirName = new StringBuilder(removedLog.name)
  .append(".")
  .append(java.util.UUID.randomUUID.toString.replaceAll("-",""))
  .append(Log.DeleteDirSuffix)
  .toString()
{code}

So the unique id and "-delete" suffix end up hogging some of the characters. 
Deleting a long-named topic results in a log message such as the following:
{code}
kafka.common.KafkaStorageException: Failed to rename log directory from 
/tmp/kafka-logs0/0-0
 to 
/tmp/kafka-logs0/0-0.797bba3fb2464729840f87769243edbb-delete
  at kafka.log.LogManager.asyncDelete(LogManager.scala:439)
  at kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:142)
  at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
  at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
  at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:221)
  at kafka.cluster.Partition.delete(Partition.scala:137)
  at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:230)
  at 
kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:260)
  at 
kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:259)
  at scala.collection.Iterator$class.foreach(Iterator.scala:727)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
  at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:259)
  at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:174)
  at kafka.server.KafkaApis.handle(KafkaApis.scala:86)
  at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:64)
  at java.lang.Thread.run(Thread.java:745)
{code}

The topic after this point still exists but has Leader set to -1 and the 
controller recognizes the topic completion as incomplete (the topic znode is 
still in /admin/delete_topics).

I don't believe linkedin has any topic name this long but I'm making the ticket 
in case anyone runs into this problem.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4891) kafka.request.logger TRACE regression

2017-03-13 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-4891:
---

 Summary: kafka.request.logger TRACE regression
 Key: KAFKA-4891
 URL: https://issues.apache.org/jira/browse/KAFKA-4891
 Project: Kafka
  Issue Type: Bug
Reporter: Onur Karaman
Assignee: Onur Karaman


Here's what kafka-request.log shows with {{kafka.request.logger}} set to TRACE:
{code}
[2017-03-13 10:06:24,402] TRACE Completed 
request:org.apache.kafka.common.requests.RequestHeader@2f99ef87 -- 
org.apache.kafka.common.requests.LeaderAndIsrRequest@34c40fc5 from connection 
127.0.0.1:9090-127.0.0.1:50969;totalTime:125,requestQueueTime:0,localTime:124,remoteTime:0,responseQueueTime:0,sendTime:1,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT
 (kafka.request.logger)
[2017-03-13 10:06:24,406] TRACE Completed 
request:org.apache.kafka.common.requests.RequestHeader@133b1de8 -- 
org.apache.kafka.common.requests.UpdateMetadataRequest@795826d from connection 
127.0.0.1:9090-127.0.0.1:50969;totalTime:3,requestQueueTime:0,localTime:2,remoteTime:0,responseQueueTime:0,sendTime:1,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT
 (kafka.request.logger)
[2017-03-13 10:06:24,943] TRACE Completed 
request:org.apache.kafka.common.requests.RequestHeader@76a9acaa -- 
org.apache.kafka.common.requests.FetchRequest@33ab3c1b from connection 
127.0.0.1:9090-127.0.0.1:50976;totalTime:522,requestQueueTime:0,localTime:13,remoteTime:506,responseQueueTime:0,sendTime:1,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT
 (kafka.request.logger)
{code}

Both the headers and requests have regressed to just show object ids instead of 
their contents from their underlying structs. I'm guessing this regression came 
from commit 
[fc1cfe475e8ae8458d8ddf119ce18d0c64653a70|https://github.com/apache/kafka/commit/fc1cfe475e8ae8458d8ddf119ce18d0c64653a70]

The logs should look something like this:
{code}
[2017-03-13 10:14:36,754] TRACE Completed 
request:{api_key=4,api_version=0,correlation_id=2,client_id=0} -- 
{controller_id=0,controller_epoch=1,partition_states=[{topic=t,partition=5,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=4,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=3,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=2,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=1,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=0,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=7,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=6,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]}],live_leaders=[{id=0,host=localhost,port=9090},{id=1,host=localhost,port=9091}]}
 from connection 
127.0.0.1:9090-127.0.0.1:51349;totalTime:155,requestQueueTime:0,localTime:155,remoteTime:0,responseQueueTime:0,sendTime:0,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT
 (kafka.request.logger)
[2017-03-13 10:14:36,758] TRACE Completed 
request:{api_key=6,api_version=3,correlation_id=3,client_id=0} -- 
{controller_id=0,controller_epoch=1,partition_states=[{topic=t,partition=5,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=4,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=3,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=2,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=1,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=0,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=7,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=6,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]}],live_brokers=[{id=1,end_points=[{port=9091,host=localhost,listener_name=PLAINTEXT,security_protocol_type=0}],rack=null},{id=0,end_points=[{port=9090,host=localhost,listener_name=PLAINTEXT,security_protocol_type=0}],rack=null}]}
 from connection 
127.0.0.1:9090-127.0.0.1:51349;totalTime:3,requestQueueTime:1,localTime:2,remoteTime:0,responseQueueTime:0,sendTime:0,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT
 (kafka.request.logger)
[2017-03-13 10:14:37,297] TRACE Completed 
request:{api_key=1,api_version=3,correlation_id=0,client_id=ReplicaFetcherThread-0-0}
 -- 

[jira] [Commented] (KAFKA-1895) Investigate moving deserialization and decompression out of KafkaConsumer

2017-02-22 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15879561#comment-15879561
 ] 

Onur Karaman commented on KAFKA-1895:
-

I think it's worth defining the relation between the two problems mentioned 
earlier:
# no means of access to raw FetchResponse data
# lack of a separate IO thread

I think Problem 1 is more of a performance problem while Problem 2 is a 
performance and usability problem (KAFKA-4753 shows that this is can lead to 
starvation).

Addressing Problem 1 doesn't solve Problem 2.

Addressing Problem 2 partially solves Problem 1. With a solution to Problem 2, 
we have the potential to also do the decompression/deserialization in the 
separate IO thread, removing decompression-in-user-thread performance concerns. 
But this wouldn't address the decompression-then-recompression performance 
concerns in MirrorMaker or perhaps some stream processing use-cases.

I think we need to solve both problems.

> Investigate moving deserialization and decompression out of KafkaConsumer
> -
>
> Key: KAFKA-1895
> URL: https://issues.apache.org/jira/browse/KAFKA-1895
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Jay Kreps
>
> The consumer implementation in KAFKA-1760 decompresses fetch responses and 
> deserializes them into ConsumerRecords which are then handed back as the 
> result of poll().
> There are several downsides to this:
> 1. It is impossible to scale serialization and decompression work beyond the 
> single thread running the KafkaConsumer.
> 2. The results can come back during the processing of other calls such as 
> commit() etc which can result in caching these records a little longer.
> An alternative would be to have ConsumerRecords wrap the actual compressed 
> serialized MemoryRecords chunks and do the deserialization during iteration. 
> This way you could scale this over a thread pool if needed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4753) KafkaConsumer susceptible to FetchResponse starvation

2017-02-22 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15879558#comment-15879558
 ] 

Onur Karaman commented on KAFKA-4753:
-

[~ijuma] I don't think mitigation is the way to go.

The two solutions I can think of are to either:
1. introduce some sort of fair scheduling logic that polls against all brokers 
but only returns records to the user by waiting in a round-robin manner for a 
completed FetchResponse from a given broker. This would damage performance 
since you're blocking the processing of fully formed FetchResponses on the 
slowest fetch.

2. do all IO in a separate thread. I think we've been gravitating towards this 
since the beginning.
- January 2015: In KAFKA-1760, KafkaConsumer implemented as being entirely 
single-threaded with user-driven fetches and heartbeats.
- July 2015: I sent out an 
[email|https://www.mail-archive.com/dev@kafka.apache.org/msg31447.html] stating 
concerns with this approach, including the user-driven fetches and heartbeats. 
Becket and I proposed a [rough design for the 
fix|https://cwiki.apache.org/confluence/display/KAFKA/New+consumer+API+change+proposal]
 back then. Some nice API tweaks were made as a result of the discussion but 
the user-driven fetches and heartbeats remained.
- October 2015: In KAFKA-2397 I added LeaveGroupRequest to reduce the 
turnaround from a controlled client shutdown and rebalancing the group.
- February 2016: In KIP-62 (KAFKA-3888) we added "max.poll.records" in KIP-41 
to mitigate the impact of record processing on the user-driven heartbeats.
- August 2016: We added a separate heartbeat thread, synchronization logic 
surrounding NetworkClient, and retained poll-based liveness by automatically 
sending out LeaveGroupRequest when "max.poll.interval.ms" wasn't honored.
- February 2017: Some of the simplest uses of KafkaConsumer with manual 
partition assignment suffers from starvation again because of the user-driven 
fetch design decision.

I think we should try to make solution 2 work. We can retain the poll-based 
liveness by continuing what we do today: automatic sending of LeaveGroupRequest 
when "max.poll.interval.ms" isn't honored. This fix could also help KAFKA-1895 
as well, as we can potentially push decompression/deserialization in the 
separate IO thread.

> KafkaConsumer susceptible to FetchResponse starvation
> -
>
> Key: KAFKA-4753
> URL: https://issues.apache.org/jira/browse/KAFKA-4753
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> FetchResponse starvation here means that the KafkaConsumer repeatedly fails 
> to fully form FetchResponses within the request timeout from a subset of the 
> brokers its fetching from while FetchResponses from the other brokers can get 
> fully formed and processed by the application.
> In other words, this ticket is concerned with scenarios where fetching from 
> some brokers hurts the progress of fetching from other brokers to the point 
> of repeatedly hitting a request timeout.
> Some FetchResponse starvation scenarios:
> 1. partition leadership of the consumer's assigned partitions is skewed 
> across brokers, causing uneven FetchResponse sizes across brokers.
> 2. the consumer seeks back on partitions on some brokers but not others, 
> causing uneven FetchResponse sizes across brokers.
> 3. the consumer's ability to keep up with various partitions across brokers 
> is skewed, causing uneven FetchResponse sizes across brokers.
> I've personally seen scenario 1 happen this past week to one of our users in 
> prod. They manually assigned partitions such that a few brokers led most of 
> the partitions while other brokers only led a single partition. When 
> NetworkClient sends out FetchRequests to different brokers in parallel with 
> an uneven partition distribution, FetchResponses from brokers who lead more 
> partitions will contain more data than FetchResponses from brokers who lead 
> few partitions. This means the small FetchResponses will get fully formed 
> quicker than larger FetchResponses. When the application eventually consumes 
> a smaller fully formed FetchResponses, the NetworkClient will send out a new 
> FetchRequest to the lightly-loaded broker. Their response will again come 
> back quickly while only marginal progress has been made on the larger 
> FetchResponse. Repeat this process several times and your application will 
> have potentially processed many smaller FetchResponses while the larger 
> FetchResponse made minimal progress and is forced to timeout, causing the 
> large FetchResponse to start all over again, which causes starvation.
> To mitigate the problem for the short term, I've suggested to our user that 
> they either:
> 1. bump up their "receive.buffer.bytes" beyond the current 

[jira] [Commented] (KAFKA-4757) Improve NetworkClient trace logging of request details

2017-02-17 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15872616#comment-15872616
 ] 

Onur Karaman commented on KAFKA-4757:
-

Left some comments on the PR.

Speaking of which, usually the PR itself should appear on the jira ticket.

I wonder if it's because the PR title isn't in the KAFKA-WXYZ:title format as 
defined here:
https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes

> Improve NetworkClient trace logging of request details
> --
>
> Key: KAFKA-4757
> URL: https://issues.apache.org/jira/browse/KAFKA-4757
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Colin P. McCabe
>
> Two issues here:
> 1. Here's what NetworkClient now shows when processing a disconnection:
> {code}
> [2017-02-10 10:48:57,052] TRACE Cancelled request 
> org.apache.kafka.clients.NetworkClient$InFlightRequest@52f759d7 due to node 0 
> being disconnected (org.apache.kafka.clients.NetworkClient)
> {code}
> The log at one point was useful and actually showed the contents of the 
> request. For instance, with FetchRequest, you used to be able to see which 
> partitions were requested as well as the offsets and max bytes requested per 
> partition.
> It looks like InFlightRequest itself doesn't actually hold the request but 
> instead currently just holds the RequestHeader. We probably want to make 
> InFlightRequest hold the entire request to make the original request show up 
> in the logs.
> 2. Sometimes I see the following log:
> {code}
> [2017-02-10 10:53:59,015] TRACE Sending {} to node 0. 
> (org.apache.kafka.clients.NetworkClient)
> {code}
> Again not very insightful.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4757) NetworkClient should log request details at trace level when a request is cancelled because of disconnection

2017-02-17 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15872390#comment-15872390
 ] 

Onur Karaman commented on KAFKA-4757:
-

Ah. Might've found it: Here's what ApiVersionsRequest schema looks like in 
org.apache.kafka.common.protocol.Protocol:
{code}
public static final Schema API_VERSIONS_REQUEST_V0 = new Schema();
{code}

> NetworkClient should log request details at trace level when a request is 
> cancelled because of disconnection
> 
>
> Key: KAFKA-4757
> URL: https://issues.apache.org/jira/browse/KAFKA-4757
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Colin P. McCabe
>
> Two issues here:
> 1. Here's what NetworkClient now shows when processing a disconnection:
> {code}
> [2017-02-10 10:48:57,052] TRACE Cancelled request 
> org.apache.kafka.clients.NetworkClient$InFlightRequest@52f759d7 due to node 0 
> being disconnected (org.apache.kafka.clients.NetworkClient)
> {code}
> The log at one point was useful and actually showed the contents of the 
> request. For instance, with FetchRequest, you used to be able to see which 
> partitions were requested as well as the offsets and max bytes requested per 
> partition.
> It looks like InFlightRequest itself doesn't actually hold the request but 
> instead currently just holds the RequestHeader. We probably want to make 
> InFlightRequest hold the entire request to make the original request show up 
> in the logs.
> 2. Sometimes I see the following log:
> {code}
> [2017-02-10 10:53:59,015] TRACE Sending {} to node 0. 
> (org.apache.kafka.clients.NetworkClient)
> {code}
> Again not very insightful.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4757) NetworkClient should log request details at trace level when a request is cancelled because of disconnection

2017-02-17 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15872284#comment-15872284
 ] 

Onur Karaman commented on KAFKA-4757:
-

[~cmccabe] I double checked and the 2nd logging issue still appears after 
KAFKA-4635. My trunk is currently at git hash 
2c91b324d4798138dd479f54269bdc4f39339817.

My initial guess is that it's related to ApiVersionsRequest, as the line 
following the bad log line usually contains a completed receive of api_key=18:
{code}
[2017-02-17 10:38:40,312] TRACE Sending {} to node 1. 
(org.apache.kafka.clients.NetworkClient)
[2017-02-17 10:38:40,312] TRACE Completed receive from node 1, for key 18, 
received 
{error_code=0,api_versions=[{api_key=0,min_version=0,max_version=2},{api_key=1,min_version=0,max_version=3},{api_key=2,min_version=0,max_version=1},{api_key=3,min_version=0,max_version=2},{api_key=4,min_version=0,max_version=0},{api_key=5,min_version=0,max_version=0},{api_key=6,min_version=0,max_version=3},{api_key=7,min_version=1,max_version=1},{api_key=8,min_version=0,max_version=2},{api_key=9,min_version=0,max_version=2},{api_key=10,min_version=0,max_version=0},{api_key=11,min_version=0,max_version=1},{api_key=12,min_version=0,max_version=0},{api_key=13,min_version=0,max_version=0},{api_key=14,min_version=0,max_version=0},{api_key=15,min_version=0,max_version=0},{api_key=16,min_version=0,max_version=0},{api_key=17,min_version=0,max_version=0},{api_key=18,min_version=0,max_version=0},{api_key=19,min_version=0,max_version=1},{api_key=20,min_version=0,max_version=0}]}
 (org.apache.kafka.clients.NetworkClient)
{code}

> NetworkClient should log request details at trace level when a request is 
> cancelled because of disconnection
> 
>
> Key: KAFKA-4757
> URL: https://issues.apache.org/jira/browse/KAFKA-4757
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Colin P. McCabe
>
> Two issues here:
> 1. Here's what NetworkClient now shows when processing a disconnection:
> {code}
> [2017-02-10 10:48:57,052] TRACE Cancelled request 
> org.apache.kafka.clients.NetworkClient$InFlightRequest@52f759d7 due to node 0 
> being disconnected (org.apache.kafka.clients.NetworkClient)
> {code}
> The log at one point was useful and actually showed the contents of the 
> request. For instance, with FetchRequest, you used to be able to see which 
> partitions were requested as well as the offsets and max bytes requested per 
> partition.
> It looks like InFlightRequest itself doesn't actually hold the request but 
> instead currently just holds the RequestHeader. We probably want to make 
> InFlightRequest hold the entire request to make the original request show up 
> in the logs.
> 2. Sometimes I see the following log:
> {code}
> [2017-02-10 10:53:59,015] TRACE Sending {} to node 0. 
> (org.apache.kafka.clients.NetworkClient)
> {code}
> Again not very insightful.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4676) Kafka consumers gets stuck for some partitions

2017-02-17 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15872213#comment-15872213
 ] 

Onur Karaman commented on KAFKA-4676:
-

This ticket never really found a root cause so it might not be the same problem 
as the starvation problem mentioned in KAFKA-4753 but it'd be convenient to 
link them together nonetheless.

> Kafka consumers gets stuck for some partitions
> --
>
> Key: KAFKA-4676
> URL: https://issues.apache.org/jira/browse/KAFKA-4676
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Vishal Shukla
>Priority: Critical
>  Labels: consumer, reliability
> Attachments: restart-node2-consumer-node-1.log, 
> restart-node2-consumer-node-2.log, stuck-case2.log, 
> stuck-consumer-node-1.log, stuck-consumer-node-2.log, 
> stuck-topic-thread-dump.log
>
>
> We recently upgraded to Kafka 0.10.1.0. We are frequently facing issue that 
> Kafka consumers get stuck suddenly for some partitions.
> Attached thread dump.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4676) Kafka consumers gets stuck for some partitions

2017-02-17 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15872199#comment-15872199
 ] 

Onur Karaman commented on KAFKA-4676:
-

[~Narendra Kumar] if you're repeatedly seeing the request timeout coming from 
the same subset of brokers, you might be hitting the issue I described in 
KAFKA-4753. I mention a few tricks to mitigate the issue in the ticket. I think 
I have a few solutions to the problem but they're more involved.

> Kafka consumers gets stuck for some partitions
> --
>
> Key: KAFKA-4676
> URL: https://issues.apache.org/jira/browse/KAFKA-4676
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Vishal Shukla
>Priority: Critical
>  Labels: consumer, reliability
> Attachments: restart-node2-consumer-node-1.log, 
> restart-node2-consumer-node-2.log, stuck-case2.log, 
> stuck-consumer-node-1.log, stuck-consumer-node-2.log, 
> stuck-topic-thread-dump.log
>
>
> We recently upgraded to Kafka 0.10.1.0. We are frequently facing issue that 
> Kafka consumers get stuck suddenly for some partitions.
> Attached thread dump.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4757) NetworkClient should log request details at trace level when a request is cancelled because of disconnection

2017-02-17 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15872194#comment-15872194
 ] 

Onur Karaman commented on KAFKA-4757:
-

Thanks [~cmccabe]. Regarding the second issue, I'll need to double check if I 
already pulled in the patch.

> NetworkClient should log request details at trace level when a request is 
> cancelled because of disconnection
> 
>
> Key: KAFKA-4757
> URL: https://issues.apache.org/jira/browse/KAFKA-4757
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Colin P. McCabe
>
> Two issues here:
> 1. Here's what NetworkClient now shows when processing a disconnection:
> {code}
> [2017-02-10 10:48:57,052] TRACE Cancelled request 
> org.apache.kafka.clients.NetworkClient$InFlightRequest@52f759d7 due to node 0 
> being disconnected (org.apache.kafka.clients.NetworkClient)
> {code}
> The log at one point was useful and actually showed the contents of the 
> request. For instance, with FetchRequest, you used to be able to see which 
> partitions were requested as well as the offsets and max bytes requested per 
> partition.
> It looks like InFlightRequest itself doesn't actually hold the request but 
> instead currently just holds the RequestHeader. We probably want to make 
> InFlightRequest hold the entire request to make the original request show up 
> in the logs.
> 2. Sometimes I see the following log:
> {code}
> [2017-02-10 10:53:59,015] TRACE Sending {} to node 0. 
> (org.apache.kafka.clients.NetworkClient)
> {code}
> Again not very insightful.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4757) NetworkClient trace level logging regressions

2017-02-10 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-4757:
---

 Summary: NetworkClient trace level logging regressions
 Key: KAFKA-4757
 URL: https://issues.apache.org/jira/browse/KAFKA-4757
 Project: Kafka
  Issue Type: Bug
Reporter: Onur Karaman


Two issues here:
1. Here's what NetworkClient now shows when processing a disconnection:
{code}
[2017-02-10 10:48:57,052] TRACE Cancelled request 
org.apache.kafka.clients.NetworkClient$InFlightRequest@52f759d7 due to node 0 
being disconnected (org.apache.kafka.clients.NetworkClient)
{code}

The log at one point was useful and actually showed the contents of the 
request. For instance, with FetchRequest, you used to be able to see which 
partitions were requested as well as the offsets and max bytes requested per 
partition.

It looks like InFlightRequest itself doesn't actually hold the request but 
instead currently just holds the RequestHeader. We probably want to make 
InFlightRequest hold the entire request to make the original request show up in 
the logs.

2. Sometimes I see the following log:
{code}
[2017-02-10 10:53:59,015] TRACE Sending {} to node 0. 
(org.apache.kafka.clients.NetworkClient)
{code}
Again not very insightful.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4753) KafkaConsumer susceptible to FetchResponse starvation

2017-02-09 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15860872#comment-15860872
 ] 

Onur Karaman commented on KAFKA-4753:
-

It might mitigate users whose consumers participate in group coordination to 
some extent but wouldn't help for manual partition assignments since the 
assignor would never get involved.

It's also not a solution to the problem.,Users can still experience the other 
scenarios. We even give users the API to seek to wherever they'd like, 
indicating that doing so should just work.

> KafkaConsumer susceptible to FetchResponse starvation
> -
>
> Key: KAFKA-4753
> URL: https://issues.apache.org/jira/browse/KAFKA-4753
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> FetchResponse starvation here means that the KafkaConsumer repeatedly fails 
> to fully form FetchResponses within the request timeout from a subset of the 
> brokers its fetching from while FetchResponses from the other brokers can get 
> fully formed and processed by the application.
> In other words, this ticket is concerned with scenarios where fetching from 
> some brokers hurts the progress of fetching from other brokers to the point 
> of repeatedly hitting a request timeout.
> Some FetchResponse starvation scenarios:
> 1. partition leadership of the consumer's assigned partitions is skewed 
> across brokers, causing uneven FetchResponse sizes across brokers.
> 2. the consumer seeks back on partitions on some brokers but not others, 
> causing uneven FetchResponse sizes across brokers.
> 3. the consumer's ability to keep up with various partitions across brokers 
> is skewed, causing uneven FetchResponse sizes across brokers.
> I've personally seen scenario 1 happen this past week to one of our users in 
> prod. They manually assigned partitions such that a few brokers led most of 
> the partitions while other brokers only led a single partition. When 
> NetworkClient sends out FetchRequests to different brokers in parallel with 
> an uneven partition distribution, FetchResponses from brokers who lead more 
> partitions will contain more data than FetchResponses from brokers who lead 
> few partitions. This means the small FetchResponses will get fully formed 
> quicker than larger FetchResponses. When the application eventually consumes 
> a smaller fully formed FetchResponses, the NetworkClient will send out a new 
> FetchRequest to the lightly-loaded broker. Their response will again come 
> back quickly while only marginal progress has been made on the larger 
> FetchResponse. Repeat this process several times and your application will 
> have potentially processed many smaller FetchResponses while the larger 
> FetchResponse made minimal progress and is forced to timeout, causing the 
> large FetchResponse to start all over again, which causes starvation.
> To mitigate the problem for the short term, I've suggested to our user that 
> they either:
> 1. bump up their "receive.buffer.bytes" beyond the current default of 64 KB 
> to something like 1 MB. This is the solution I short-term solution I 
> suggested they go with.
> 2. reduce the "max.partition.fetch.bytes" down from the current default of 1 
> MB to something like 100 KB. This solution wasn't advised as it could impact 
> broker performance.
> 3. ask our SRE's to run a partition reassignment to balance out the partition 
> leadership (partitions were already being led by their preferred leaders).
> 4. bump up their request timeout. It was set to open-source's former default 
> of 40 seconds.
> Contributing factors:
> 1. uneven FetchResponse sizes across brokers.
> 2. processing time of the polled ConsumerRecords.
> 3. "max.poll.records" increases the number of polls needed to consume a 
> FetchResponse, making constant-time overhead per poll magnified.
> 4. "max.poll.records" makes KafkaConsumer.poll bypass calls to 
> ConsumerNetworkClient.poll.
> 5. java.nio.channels.Selector.select, Selector.poll, NetworkClient.poll, and 
> ConsumerNetworkClient.poll can return before the poll timeout as soon as a 
> single channel is selected.
> 6. NetworkClient.poll is solely driven by the user thread with manual 
> partition assignment.
> So far I've only locally reproduced starvation scenario 1 and haven't even 
> attempted the other scenarios. Preventing the bypass of 
> ConsumerNetworkClient.poll (contributing factor 3) mitigates the issue, but 
> it seems starvation would still be possible.
> How to reproduce starvation scenario 1:
> 1. startup zookeeper
> 2. startup two brokers
> 3. create a topic t0 with two partitions led by broker 0 and create a topic 
> t1 with a single partition led by broker 1
> {code}
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t0 
> > 

[jira] [Commented] (KAFKA-4753) KafkaConsumer susceptible to FetchResponse starvation

2017-02-09 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15860366#comment-15860366
 ] 

Onur Karaman commented on KAFKA-4753:
-

One way to mitigate but not fully solve the starvation problem is to make sure 
that KafkaConsumer.poll always triggers the java.nio.channels.Selector.

For instance, we can get rid of the bypass in pollOnce:
{code}
-// if data is available already, return it immediately
-Map>> records = 
fetcher.fetchedRecords();
-if (!records.isEmpty())
-return records;
-
{code}

Alternatively, we can add some logic in KafkaConsumer.poll:
{code}
Map>> records = 
pollOnce(remaining);
if (!records.isEmpty()) {
// before returning the fetched records, we can send off 
the next round of fetches
// and avoid block waiting for their responses to enable 
pipelining while the user
// is handling the fetched records.
//
// NOTE: since the consumed position has already been 
updated, we must not allow
// wakeups or any other errors to be triggered prior to 
returning the fetched records.
-   if (fetcher.sendFetches() > 0) {
+   if (fetcher.sendFetches() > 0 || 
client.pendingRequestCount() > 0) {
client.pollNoWakeup();
}

if (this.interceptors == null)
return new ConsumerRecords<>(records);
else
return this.interceptors.onConsume(new 
ConsumerRecords<>(records));
}
{code}

I'm not crazy about these solutions as it only mitigates the problem and 
doesn't solve it.

> KafkaConsumer susceptible to FetchResponse starvation
> -
>
> Key: KAFKA-4753
> URL: https://issues.apache.org/jira/browse/KAFKA-4753
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> FetchResponse starvation here means that the KafkaConsumer repeatedly fails 
> to fully form FetchResponses within the request timeout from a subset of the 
> brokers its fetching from while FetchResponses from the other brokers can get 
> fully formed and processed by the application.
> In other words, this ticket is concerned with scenarios where fetching from 
> some brokers hurts the progress of fetching from other brokers to the point 
> of repeatedly hitting a request timeout.
> Some FetchResponse starvation scenarios:
> 1. partition leadership of the consumer's assigned partitions is skewed 
> across brokers, causing uneven FetchResponse sizes across brokers.
> 2. the consumer seeks back on partitions on some brokers but not others, 
> causing uneven FetchResponse sizes across brokers.
> 3. the consumer's ability to keep up with various partitions across brokers 
> is skewed, causing uneven FetchResponse sizes across brokers.
> I've personally seen scenario 1 happen this past week to one of our users in 
> prod. They manually assigned partitions such that a few brokers led most of 
> the partitions while other brokers only led a single partition. When 
> NetworkClient sends out FetchRequests to different brokers in parallel with 
> an uneven partition distribution, FetchResponses from brokers who lead more 
> partitions will contain more data than FetchResponses from brokers who lead 
> few partitions. This means the small FetchResponses will get fully formed 
> quicker than larger FetchResponses. When the application eventually consumes 
> a smaller fully formed FetchResponses, the NetworkClient will send out a new 
> FetchRequest to the lightly-loaded broker. Their response will again come 
> back quickly while only marginal progress has been made on the larger 
> FetchResponse. Repeat this process several times and your application will 
> have potentially processed many smaller FetchResponses while the larger 
> FetchResponse made minimal progress and is forced to timeout, causing the 
> large FetchResponse to start all over again, which causes starvation.
> To mitigate the problem for the short term, I've suggested to our user that 
> they either:
> 1. bump up their "receive.buffer.bytes" beyond the current default of 64 KB 
> to something like 1 MB. This is the solution I short-term solution I 
> suggested they go with.
> 2. reduce the "max.partition.fetch.bytes" down from the current default of 1 
> MB to something like 100 KB. This solution wasn't advised as it could impact 
> broker performance.
> 3. ask our SRE's to run a partition reassignment to balance out the partition 
> leadership (partitions were already being led by their 

[jira] [Created] (KAFKA-4753) KafkaConsumer susceptible to FetchResponse starvation

2017-02-09 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-4753:
---

 Summary: KafkaConsumer susceptible to FetchResponse starvation
 Key: KAFKA-4753
 URL: https://issues.apache.org/jira/browse/KAFKA-4753
 Project: Kafka
  Issue Type: Bug
Reporter: Onur Karaman
Assignee: Onur Karaman


FetchResponse starvation here means that the KafkaConsumer repeatedly fails to 
fully form FetchResponses within the request timeout from a subset of the 
brokers its fetching from while FetchResponses from the other brokers can get 
fully formed and processed by the application.

In other words, this ticket is concerned with scenarios where fetching from 
some brokers hurts the progress of fetching from other brokers to the point of 
repeatedly hitting a request timeout.

Some FetchResponse starvation scenarios:
1. partition leadership of the consumer's assigned partitions is skewed across 
brokers, causing uneven FetchResponse sizes across brokers.
2. the consumer seeks back on partitions on some brokers but not others, 
causing uneven FetchResponse sizes across brokers.
3. the consumer's ability to keep up with various partitions across brokers is 
skewed, causing uneven FetchResponse sizes across brokers.

I've personally seen scenario 1 happen this past week to one of our users in 
prod. They manually assigned partitions such that a few brokers led most of the 
partitions while other brokers only led a single partition. When NetworkClient 
sends out FetchRequests to different brokers in parallel with an uneven 
partition distribution, FetchResponses from brokers who lead more partitions 
will contain more data than FetchResponses from brokers who lead few 
partitions. This means the small FetchResponses will get fully formed quicker 
than larger FetchResponses. When the application eventually consumes a smaller 
fully formed FetchResponses, the NetworkClient will send out a new FetchRequest 
to the lightly-loaded broker. Their response will again come back quickly while 
only marginal progress has been made on the larger FetchResponse. Repeat this 
process several times and your application will have potentially processed many 
smaller FetchResponses while the larger FetchResponse made minimal progress and 
is forced to timeout, causing the large FetchResponse to start all over again, 
which causes starvation.

To mitigate the problem for the short term, I've suggested to our user that 
they either:
1. bump up their "receive.buffer.bytes" beyond the current default of 64 KB to 
something like 1 MB. This is the solution I short-term solution I suggested 
they go with.
2. reduce the "max.partition.fetch.bytes" down from the current default of 1 MB 
to something like 100 KB. This solution wasn't advised as it could impact 
broker performance.
3. ask our SRE's to run a partition reassignment to balance out the partition 
leadership (partitions were already being led by their preferred leaders).
4. bump up their request timeout. It was set to open-source's former default of 
40 seconds.

Contributing factors:
1. uneven FetchResponse sizes across brokers.
2. processing time of the polled ConsumerRecords.
3. "max.poll.records" increases the number of polls needed to consume a 
FetchResponse, making constant-time overhead per poll magnified.
4. "max.poll.records" makes KafkaConsumer.poll bypass calls to 
ConsumerNetworkClient.poll.
5. java.nio.channels.Selector.select, Selector.poll, NetworkClient.poll, and 
ConsumerNetworkClient.poll can return before the poll timeout as soon as a 
single channel is selected.
6. NetworkClient.poll is solely driven by the user thread with manual partition 
assignment.

So far I've only locally reproduced starvation scenario 1 and haven't even 
attempted the other scenarios. Preventing the bypass of 
ConsumerNetworkClient.poll (contributing factor 3) mitigates the issue, but it 
seems starvation would still be possible.

How to reproduce starvation scenario 1:
1. startup zookeeper
2. startup two brokers
3. create a topic t0 with two partitions led by broker 0 and create a topic t1 
with a single partition led by broker 1
{code}
> ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t0 
> --replica-assignment 0,0
> ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t1 
> --replica-assignment 1
{code}
4. Produce a lot of data into these topics
{code}
> ./bin/kafka-producer-perf-test.sh --topic t0 --num-records 2000 
> --record-size 100 --throughput 10 --producer-props 
> bootstrap.servers=localhost:9090,localhost:9091
> ./bin/kafka-producer-perf-test.sh --topic t1 --num-records 1000 
> --record-size 100 --throughput 10 --producer-props 
> bootstrap.servers=localhost:9090,localhost:9091
{code}
5. startup a consumer that consumes these 3 partitions with TRACE level 
NetworkClient logging
{code}
> ./bin/kafka-run-class.sh 
> 

[jira] [Created] (KAFKA-4749) fix join-time-max and sync-time-max MeasurableStat type

2017-02-08 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-4749:
---

 Summary: fix join-time-max and sync-time-max MeasurableStat type
 Key: KAFKA-4749
 URL: https://issues.apache.org/jira/browse/KAFKA-4749
 Project: Kafka
  Issue Type: Bug
Reporter: Onur Karaman
Assignee: Onur Karaman


GroupCoordinatorMetrics currently sets up join-time-max and sync-time-max 
incorrectly as a "new Avg()" MeasurableStat instead of "new Max()"



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4747) add metrics for KafkaConsumer.poll

2017-02-08 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-4747:
---

 Summary: add metrics for KafkaConsumer.poll
 Key: KAFKA-4747
 URL: https://issues.apache.org/jira/browse/KAFKA-4747
 Project: Kafka
  Issue Type: Improvement
Reporter: Onur Karaman
Assignee: Onur Karaman


KafkaConsumer heavily depends on KafkaConsumer.poll yet we don't have metrics 
directly associated with it.

We probably want to add two metrics:
1. time spent in KafkaConsumer.poll
2. time since last KafkaConsumer.poll (measured as now - endTimeOfLastPoll)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-4513) Support migration of old consumers to new consumers without downtime

2017-02-06 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman reassigned KAFKA-4513:
---

Assignee: Onur Karaman

> Support migration of old consumers to new consumers without downtime
> 
>
> Key: KAFKA-4513
> URL: https://issues.apache.org/jira/browse/KAFKA-4513
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Ismael Juma
>Assignee: Onur Karaman
>
> Some ideas were discussed in the following thread:
> http://markmail.org/message/ovngfw3ibixlquxh



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4704) Group coordinator cache loading fails if groupId is used first for consumer groups and then for simple consumer

2017-01-26 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15840663#comment-15840663
 ] 

Onur Karaman commented on KAFKA-4704:
-

It might be worth mentioning that I hit a similar scenario when implementing 
the new consumer migration. Rolling back from the migration-aware old consumer 
(or just new consumer) to the old consumer with kafka-based offset storage (or 
{{dual.commit.enabled}}) causes the old consumer offset commits to fail 
silently. This is because by that point, the group has been added to the 
{{GroupCoordinator}} and generation id has been incremented to be >= 0.  The 
old consumer, on the other hand, is naively sending {{OffsetCommitRequests}} 
with an empty member id and generation id of -1, so the {{GroupCoordinator}} 
will reject the request with {{UNKNOWN_MEMBER_ID}}.

I have a workaround constraint to address the problem, but I'll leave that for 
when I send out the actual proposal.

> Group coordinator cache loading fails if groupId is used first for consumer 
> groups and then for simple consumer
> ---
>
> Key: KAFKA-4704
> URL: https://issues.apache.org/jira/browse/KAFKA-4704
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
> Fix For: 0.10.2.0
>
>
> When all the members in a consumer group have died and all of its offsets 
> have expired, we write a tombstone to __consumer_offsets so that its group 
> metadata is cleaned up. It is possible that after this happens, the same 
> groupId is then used only for offset storage (i.e. by "simple" consumers). 
> Our current cache loading logic, which is triggered when a coordinator first 
> takes over control of a partition, does not account for this scenario and 
> would currently fail.
> This is probably an unlikely scenario to hit in practice, but it reveals the 
> lack of test coverage around the cache loading logic. We should improve this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3959) __consumer_offsets wrong number of replicas at startup

2017-01-25 Thread Onur Karaman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Onur Karaman updated KAFKA-3959:

Fix Version/s: (was: 0.10.3.0)
   0.10.2.0

> __consumer_offsets wrong number of replicas at startup
> --
>
> Key: KAFKA-3959
> URL: https://issues.apache.org/jira/browse/KAFKA-3959
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, offset manager, replication
>Affects Versions: 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.0.2, 0.10.1.0, 
> 0.10.1.1, 0.10.1.2
> Environment: Brokers of 3 kafka nodes running Red Hat Enterprise 
> Linux Server release 7.2 (Maipo)
>Reporter: Alban Hurtaud
>Assignee: Onur Karaman
>Priority: Blocker
>  Labels: needs-kip, reliability
> Fix For: 0.10.2.0
>
>
> When creating a stack of 3 kafka brokers, the consumer is starting faster 
> than kafka nodes and when trying to read a topic, only one kafka node is 
> available.
> So the __consumer_offsets is created with a replication factor set to 1 
> (instead of configured 3) :
> offsets.topic.replication.factor=3
> default.replication.factor=3
> min.insync.replicas=2
> Then, other kafka nodes go up and we have exceptions because the replicas # 
> for __consumer_offsets is 1 and min insync is 2. So exceptions are thrown.
> What I missed is : Why the __consumer_offsets is created with replication to 
> 1 (when 1 broker is running) whereas in server.properties it is set to 3 ?
> To reproduce : 
> - Prepare 3 kafka nodes with the 3 lines above added to servers.properties.
> - Run one kafka,
> - Run one consumer (the __consumer_offsets is created with replicas =1)
> - Run 2 more kafka nodes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4576) Log segments close to max size break on fetch

2017-01-03 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15794567#comment-15794567
 ] 

Onur Karaman commented on KAFKA-4576:
-

I think you're right, [~huxi_2b]. It looks like we incorrectly assume 
"FileChannel.read" does the full read all throughout the FileMessageSet class.

> Log segments close to max size break on fetch
> -
>
> Key: KAFKA-4576
> URL: https://issues.apache.org/jira/browse/KAFKA-4576
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.1.1
>Reporter: Ivan Babrou
>
> We are running Kafka 0.10.1.1~rc1 (it's the same as 0.10.1.1).
> Max segment size is set to 2147483647 globally, that's 1 byte less than max 
> signed int32.
> Every now and then we see failures like this:
> {noformat}
> Dec 25 18:20:47 myhost kafka-run-class.sh[2054]: ERROR [Replica Manager on 
> Broker 1006]: Error processing fetch operation on partition [mytopic,11], 
> offset 483579108587 (kafka.server.ReplicaManager)
> Dec 25 18:20:47 myhost kafka-run-class.sh[2054]: 
> java.lang.IllegalStateException: Failed to read complete buffer for 
> targetOffset 483686627237 startPosition 2145701130 in 
> /disk/data0/kafka-logs/mytopic-11/483571890786.log
> Dec 25 18:20:47 myhost kafka-run-class.sh[2054]: at 
> kafka.log.FileMessageSet.searchForOffsetWithSize(FileMessageSet.scala:145)
> Dec 25 18:20:47 myhost kafka-run-class.sh[2054]: at 
> kafka.log.LogSegment.translateOffset(LogSegment.scala:128)
> Dec 25 18:20:47 myhost kafka-run-class.sh[2054]: at 
> kafka.log.LogSegment.read(LogSegment.scala:180)
> Dec 25 18:20:47 myhost kafka-run-class.sh[2054]: at 
> kafka.log.Log.read(Log.scala:563)
> Dec 25 18:20:47 myhost kafka-run-class.sh[2054]: at 
> kafka.server.ReplicaManager.kafka$server$ReplicaManager$$read$1(ReplicaManager.scala:567)
> Dec 25 18:20:47 myhost kafka-run-class.sh[2054]: at 
> kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:606)
> Dec 25 18:20:47 myhost kafka-run-class.sh[2054]: at 
> kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:605)
> Dec 25 18:20:47 myhost kafka-run-class.sh[2054]: at 
> scala.collection.Iterator$class.foreach(Iterator.scala:893)
> Dec 25 18:20:47 myhost kafka-run-class.sh[2054]: at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> Dec 25 18:20:47 myhost kafka-run-class.sh[2054]: at 
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> Dec 25 18:20:47 myhost kafka-run-class.sh[2054]: at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> Dec 25 18:20:47 myhost kafka-run-class.sh[2054]: at 
> kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:605)
> Dec 25 18:20:47 myhost kafka-run-class.sh[2054]: at 
> kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:469)
> Dec 25 18:20:47 myhost kafka-run-class.sh[2054]: at 
> kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:534)
> Dec 25 18:20:47 myhost kafka-run-class.sh[2054]: at 
> kafka.server.KafkaApis.handle(KafkaApis.scala:79)
> Dec 25 18:20:47 myhost kafka-run-class.sh[2054]: at 
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
> Dec 25 18:20:47 myhost kafka-run-class.sh[2054]: at 
> java.lang.Thread.run(Thread.java:745)
> {noformat}
> {noformat}
> ...
> -rw-r--r-- 1 kafka kafka  0 Dec 25 15:15 
> 483557418204.timeindex
> -rw-r--r-- 1 kafka kafka   9496 Dec 25 15:26 483564654488.index
> -rw-r--r-- 1 kafka kafka 2145763964 Dec 25 15:26 483564654488.log
> -rw-r--r-- 1 kafka kafka  0 Dec 25 15:26 
> 483564654488.timeindex
> -rw-r--r-- 1 kafka kafka   9576 Dec 25 15:37 483571890786.index
> -rw-r--r-- 1 kafka kafka 2147483644 Dec 25 15:37 483571890786.log
> -rw-r--r-- 1 kafka kafka  0 Dec 25 15:37 
> 483571890786.timeindex
> -rw-r--r-- 1 kafka kafka   9568 Dec 25 15:48 483579135712.index
> -rw-r--r-- 1 kafka kafka 2146791360 Dec 25 15:48 483579135712.log
> -rw-r--r-- 1 kafka kafka  0 Dec 25 15:48 
> 483579135712.timeindex
> -rw-r--r-- 1 kafka kafka   9408 Dec 25 15:59 483586374164.index
> ...
> {noformat}
> Here 483571890786.log is just 3 bytes below the max size.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   3   4   >