mmliu created KAFKA-410:
---------------------------

             Summary: after been killed, one broker fail to re-join the cluster 
after restarted
                 Key: KAFKA-410
                 URL: https://issues.apache.org/jira/browse/KAFKA-410
             Project: Kafka
          Issue Type: Bug
          Components: core
         Environment: the version of kafka we use is trunk 2a59ad76c6
            Reporter: mmliu


In my kafka cluster ,there are 2 brokers,

One of them was killed by accident yesterday, after that ,I tried to restart 
the broker,but seems that it failed to join the cluster.

In zookeeper,it successfully register itself,but after sending logs, no data 
show up in kafka data directory

This is the log of the broker after restart it:

[2012-07-21 17:34:27,807] DEBUG preRegister called. 
Server=com.sun.jmx.mbeanserver.JmxMBeanServer@6443226, 
name=kafka:type=kafka.KafkaLog4j (root)
[2012-07-21 17:34:27,808] DEBUG Adding AppenderMBean for appender named 
fileAppender (org.apache.log4j.jmx.LoggerDynamicMBean)
[2012-07-21 17:34:27,810] DEBUG getMBeanInfo called. 
(org.apache.log4j.jmx.AppenderDynamicMBean)
[2012-07-21 17:34:27,810] DEBUG preRegister called. 
Server=com.sun.jmx.mbeanserver.JmxMBeanServer@6443226, 
name=log4j:appender=fileAppender (org.apache.log4j.jmx.AppenderDynamicMBean)
[2012-07-21 17:34:27,810] DEBUG Adding 
LayoutMBean:fileAppender,layout=org.apache.log4j.PatternLayout 
(org.apache.log4j.jmx.AppenderDynamicMBean)
[2012-07-21 17:34:27,811] DEBUG getMBeanInfo called. 
(org.apache.log4j.jmx.LayoutDynamicMBean)
[2012-07-21 17:34:27,811] DEBUG preRegister called. 
Server=com.sun.jmx.mbeanserver.JmxMBeanServer@6443226, 
name=log4j:appender=fileAppender,layout=org.apache.log4j.PatternLayout 
(org.apache.log4j.jmx.LayoutDynamicMBean)
[2012-07-21 17:34:27,967] INFO The number of partitions for topic  no_appkey : 
1 (kafka.utils.Utils$)
[2012-07-21 17:34:27,969] INFO The number of partitions for topic   
parse_exception : 1 (kafka.utils.Utils$)
[2012-07-21 17:34:27,972] INFO Starting Kafka server... 
(kafka.server.KafkaServer)
[2012-07-21 17:34:27,984] INFO starting log cleaner every 60000 ms 
(kafka.log.LogManager)
[2012-07-21 17:34:27,990] INFO connecting to ZK: XX.XX.XX.229:2181/kafka 
(kafka.server.KafkaZooKeeper)
[2012-07-21 17:34:27,999] DEBUG Creating new ZookKeeper instance to connect to 
XX.XX.XX.229:2181/kafka. (org.I0Itec.zkclient.ZkConnection)
[2012-07-21 17:34:27,999] INFO Starting ZkClient event thread. 
(org.I0Itec.zkclient.ZkEventThread)
[2012-07-21 17:34:28,006] INFO Client 
environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT 
(org.apache.zookeeper.ZooKeeper)
[2012-07-21 17:34:28,006] INFO Client environment:host.name=mobile-1 
(org.apache.zookeeper.ZooKeeper)
[2012-07-21 17:34:28,006] INFO Client environment:java.version=1.6.0_17 
(org.apache.zookeeper.ZooKeeper)
[2012-07-21 17:34:28,006] INFO Client environment:java.vendor=Sun Microsystems 
Inc. (org.apache.zookeeper.ZooKeeper)
[2012-07-21 17:34:28,006] INFO Client 
environment:java.home=/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.0.x86_64/jre 
(org.apache.zookeeper.ZooKeeper)
[2012-07-21 17:34:28,006] INFO Client 
environment:java.class.path=:/home/Our_Server/kafka/libs/jopt-simple-3.2.jar:/home/Our_Server/kafka/libs/log4j-1.2.15.jar:/home/Our_Server/kafka/libs/scala-compiler.jar:/home/Our_Server/kafka/libs/scala-library.jar:/home/Our_Server/kafka/libs/snappy-java-1.0.4.1.jar:/home/Our_Server/kafka/libs/zkclient-0.1.jar:/home/Our_Server/kafka/libs/zookeeper-3.3.4.jar:/home/Our_Server/kafka/kafka-trunk-2a59ad76c6_scala-2.8.0.jar
 (org.apache.zookeeper.ZooKeeper)
[2012-07-21 17:34:28,006] INFO Client 
environment:java.library.path=/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.0.x86_64/jre/lib/amd64/server:/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.0.x86_64/jre/lib/amd64:/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.0.x86_64/jre/../lib/amd64:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
 (org.apache.zookeeper.ZooKeeper)
[2012-07-21 17:34:28,006] INFO Client environment:java.io.tmpdir=/tmp 
(org.apache.zookeeper.ZooKeeper)
[2012-07-21 17:34:28,006] INFO Client environment:java.compiler=<NA> 
(org.apache.zookeeper.ZooKeeper)
[2012-07-21 17:34:28,006] INFO Client environment:os.name=Linux 
(org.apache.zookeeper.ZooKeeper)
[2012-07-21 17:34:28,006] INFO Client environment:os.arch=amd64 
(org.apache.zookeeper.ZooKeeper)
[2012-07-21 17:34:28,006] INFO Client 
environment:os.version=2.6.32-71.29.1.el6.x86_64 
(org.apache.zookeeper.ZooKeeper)
[2012-07-21 17:34:28,006] INFO Client environment:user.name=Our_Server 
(org.apache.zookeeper.ZooKeeper)
[2012-07-21 17:34:28,006] INFO Client environment:user.home=/home/Our_Server 
(org.apache.zookeeper.ZooKeeper)
[2012-07-21 17:34:28,006] INFO Client 
environment:user.dir=/home/Our_Server/kafka (org.apache.zookeeper.ZooKeeper)
[2012-07-21 17:34:28,007] INFO Initiating client connection, 
connectString=XX.XX.XX.229:2181/kafka sessionTimeout=6000 
watcher=org.I0Itec.zkclient.ZkClient@3c6210fb (org.apache.zookeeper.ZooKeeper)
[2012-07-21 17:34:28,009] DEBUG zookeeper.disableAutoWatchReset is false 
(org.apache.zookeeper.ClientCnxn)
[2012-07-21 17:34:28,024] INFO Opening socket connection to server 
/XX.XX.XX.229:2181 (org.apache.zookeeper.ClientCnxn)
[2012-07-21 17:34:28,028] INFO Socket connection established to 
mobile-1/XX.XX.XX.229:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2012-07-21 17:34:28,030] DEBUG Session establishment request sent on 
mobile-1/XX.XX.XX.229:2181 (org.apache.zookeeper.ClientCnxn)
[2012-07-21 17:34:28,048] INFO Session establishment complete on server 
mobile-1/XX.XX.XX.229:2181, sessionid = 0x1389e95409a0b65, negotiated timeout = 
6000 (org.apache.zookeeper.ClientCnxn)
[2012-07-21 17:34:28,049] INFO zookeeper state changed (SyncConnected) 
(org.I0Itec.zkclient.ZkClient)
[2012-07-21 17:34:28,147] INFO Awaiting connections on port 9092 
(kafka.network.Acceptor)
[2012-07-21 17:34:28,149] DEBUG Will try to load MX4j now, if it's in the 
classpath (kafka.utils.Mx4jLoader$)
[2012-07-21 17:34:28,149] INFO Will not load MX4J, mx4j-tools.jar is not in the 
classpath (kafka.utils.Mx4jLoader$)
[2012-07-21 17:34:28,150] INFO Registering broker /brokers/ids/1 
(kafka.server.KafkaZooKeeper)
[2012-07-21 17:34:28,173] DEBUG Reading reply sessionid:0x1389e95409a0b65, 
packet:: clientPath:null serverPath:null finished:false header:: 1,1  
replyHeader:: 1,8312179,0  request:: 
'/kafka/brokers/ids/1,#3132302e3139372e38342e3232392d313334323836333236383135303a3132302e3139372e38342e3232393a39303932,v{s{31,s{'world,'anyone}}},1
  response:: '/kafka/brokers/ids/1  (org.apache.zookeeper.ClientCnxn)
[2012-07-21 17:34:28,174] INFO Registering broker /brokers/ids/1 succeeded with 
id:1,creatorId:XX.XX.XX.229-1342863268150,host:XX.XX.XX.229,port:9092 
(kafka.server.KafkaZooKeeper)
[2012-07-21 17:34:28,188] INFO Starting log flusher every 1000 ms with the 
following overrides Map() (kafka.log.LogManager)
[2012-07-21 17:34:28,189] INFO Kafka server started. (kafka.server.KafkaServer)
[2012-07-21 17:34:29,189] DEBUG flushing the high watermark of all logs 
(kafka.log.LogManager)
[2012-07-21 17:34:30,158] DEBUG Got ping response for sessionid: 
0x1389e95409a0b65 after 0ms (org.apache.zookeeper.ClientCnxn)
[2012-07-21 17:34:30,188] DEBUG flushing the high watermark of all logs 
(kafka.log.LogManager)
[2012-07-21 17:34:31,188] DEBUG flushing the high watermark of all logs 
(kafka.log.LogManager)
[2012-07-21 17:34:32,158] DEBUG Got ping response for sessionid: 
0x1389e95409a0b65 after 0ms (org.apache.zookeeper.ClientCnxn)
[2012-07-21 17:34:32,188] DEBUG flushing the high watermark of all logs 
(kafka.log.LogManager)
[2012-07-21 17:34:33,188] DEBUG flushing the high watermark of all logs 
(kafka.log.LogManager)
[2012-07-21 17:34:34,158] DEBUG Got ping response for sessionid: 
0x1389e95409a0b65 after 0ms (org.apache.zookeeper.ClientCnxn)
[2012-07-21 17:34:34,188] DEBUG flushing the high watermark of all logs 
(kafka.log.LogManager)
[2012-07-21 17:34:35,188] DEBUG flushing the high watermark of all logs 
(kafka.log.LogManager)
[2012-07-21 17:34:36,157] DEBUG Got ping response for sessionid: 
0x1389e95409a0b65 after 0ms (org.apache.zookeeper.ClientCnxn)
[2012-07-21 17:34:36,188] DEBUG flushing the high watermark of all logs 
(kafka.log.LogManager)
[2012-07-21 17:34:37,188] DEBUG flushing the high watermark of all logs 
(kafka.log.LogManager)
[2012-07-21 17:34:38,168] DEBUG Got ping response for sessionid: 
0x1389e95409a0b65 after 10ms (org.apache.zookeeper.ClientCnxn)
[2012-07-21 17:34:38,188] DEBUG flushing the high watermark of all logs 
(kafka.log.LogManager)
[2012-07-21 17:34:39,188] DEBUG flushing the high watermark of all logs 
(kafka.log.LogManager)
[2012-07-21 17:34:40,157] DEBUG Got ping response for sessionid: 
0x1389e95409a0b65 after 0ms (org.apache.zookeeper.ClientCnxn)
...
...
...







This is the error from producer before send any data to brokers:

[2012-07-21 20:10:33,217] INFO Starting ZkClient event thread. 
(org.I0Itec.zkclient.ZkEventThread)
[2012-07-21 20:10:33,222] INFO Client 
environment:zookeeper.version=3.3.3-1073969, built on 02/23/2011 22:27 GMT 
(org.apache.zookeeper.ZooKeeper)
[2012-07-21 20:10:33,222] INFO Client environment:host.name=mobile-2 
(org.apache.zookeeper.ZooKeeper)
[2012-07-21 20:10:33,222] INFO Client environment:java.version=1.7.0_02 
(org.apache.zookeeper.ZooKeeper)
[2012-07-21 20:10:33,222] INFO Client environment:java.vendor=Oracle 
Corporation (org.apache.zookeeper.ZooKeeper)
[2012-07-21 20:10:33,222] INFO Client environment:java.home=/opt/jdk1.7.0/jre 
(org.apache.zookeeper.ZooKeeper)
[2012-07-21 20:10:33,222] INFO Client 
environment:java.class.path=/home/our_server_name/apps/our_service/current/our_service_2.9.1-0.2.2.jar
 (org.apache.zookeeper.ZooKeeper)
[2012-07-21 20:10:33,222] INFO Client 
environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
 (org.apache.zookeeper.ZooKeeper)
[2012-07-21 20:10:33,222] INFO Client environment:java.io.tmpdir=/tmp 
(org.apache.zookeeper.ZooKeeper)
[2012-07-21 20:10:33,222] INFO Client environment:java.compiler=<NA> 
(org.apache.zookeeper.ZooKeeper)
[2012-07-21 20:10:33,222] INFO Client environment:os.name=Linux 
(org.apache.zookeeper.ZooKeeper)
[2012-07-21 20:10:33,222] INFO Client environment:os.arch=amd64 
(org.apache.zookeeper.ZooKeeper)
[2012-07-21 20:10:33,223] INFO Client 
environment:os.version=2.6.32-71.el6.x86_64 (org.apache.zookeeper.ZooKeeper)
[2012-07-21 20:10:33,223] INFO Client environment:user.name=our_server_name 
(org.apache.zookeeper.ZooKeeper)
[2012-07-21 20:10:33,223] INFO Client 
environment:user.home=/home/our_server_name (org.apache.zookeeper.ZooKeeper)
[2012-07-21 20:10:33,223] INFO Client 
environment:user.dir=/home/our_server_name/apps/our_service/releases/20120716050825
 (org.apache.zookeeper.ZooKeeper)
[2012-07-21 20:10:33,223] INFO Initiating client connection, 
connectString=XX.XX.XX.229:2181/kafka sessionTimeout=6000 
watcher=org.I0Itec.zkclient.ZkClient@9875096 (org.apache.zookeeper.ZooKeeper)
[2012-07-21 20:10:33,236] INFO Opening socket connection to server 
/XX.XX.XX.229:2181 (org.apache.zookeeper.ClientCnxn)
[2012-07-21 20:10:33,239] INFO Socket connection established to 
mobile-1/XX.XX.XX.229:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2012-07-21 20:10:33,253] INFO Session establishment complete on server 
mobile-1/XX.XX.XX.229:2181, sessionid = 0x1389e95409a0c02, negotiated timeout = 
6000 (org.apache.zookeeper.ClientCnxn)
[2012-07-21 20:10:33,255] INFO zookeeper state changed (SyncConnected) 
(org.I0Itec.zkclient.ZkClient)
[2012-07-21 20:10:33,275] DEBUG Broker ids and # of partitions on each for 
topic: nrt-test = ArrayBuffer((0,32)) (kafka.producer.ZKBrokerPartitionInfo)
[2012-07-21 20:10:33,293] DEBUG Sorted list of broker ids and partition ids on 
each for topic: nrt-test = TreeSet(0-0, 0-1, 0-2, 0-3, 0-4, 0-5, 0-6, 0-7, 0-8, 
0-9, 0-10, 0-11, 0-12, 0-13, 0-14, 0-15, 0-16, 0-17, 0-18, 0-19, 0-20, 0-21, 
0-22, 0-23, 0-24, 0-25, 0-26, 0-27, 0-28, 0-29, 0-30, 0-31) 
(kafka.producer.ZKBrokerPartitionInfo)
[2012-07-21 20:10:33,295] DEBUG Broker ids and # of partitions on each for 
topic: app_log = ArrayBuffer((0,32)) (kafka.producer.ZKBrokerPartitionInfo)
[2012-07-21 20:10:33,297] DEBUG Sorted list of broker ids and partition ids on 
each for topic: app_log = TreeSet(0-0, 0-1, 0-2, 0-3, 0-4, 0-5, 0-6, 0-7, 0-8, 
0-9, 0-10, 0-11, 0-12, 0-13, 0-14, 0-15, 0-16, 0-17, 0-18, 0-19, 0-20, 0-21, 
0-22, 0-23, 0-24, 0-25, 0-26, 0-27, 0-28, 0-29, 0-30, 0-31) 
(kafka.producer.ZKBrokerPartitionInfo)
[2012-07-21 20:10:33,299] DEBUG Broker ids and # of partitions on each for 
topic: install-test = ArrayBuffer((0,32)) (kafka.producer.ZKBrokerPartitionInfo)
[2012-07-21 20:10:33,300] DEBUG Sorted list of broker ids and partition ids on 
each for topic: install-test = TreeSet(0-0, 0-1, 0-2, 0-3, 0-4, 0-5, 0-6, 0-7, 
0-8, 0-9, 0-10, 0-11, 0-12, 0-13, 0-14, 0-15, 0-16, 0-17, 0-18, 0-19, 0-20, 
0-21, 0-22, 0-23, 0-24, 0-25, 0-26, 0-27, 0-28, 0-29, 0-30, 0-31) 
(kafka.producer.ZKBrokerPartitionInfo)
[2012-07-21 20:10:33,302] DEBUG Broker ids and # of partitions on each for 
topic: nrt-test-less = ArrayBuffer((0,32)) 
(kafka.producer.ZKBrokerPartitionInfo)
[2012-07-21 20:10:33,304] DEBUG Sorted list of broker ids and partition ids on 
each for topic: nrt-test-less = TreeSet(0-0, 0-1, 0-2, 0-3, 0-4, 0-5, 0-6, 0-7, 
0-8, 0-9, 0-10, 0-11, 0-12, 0-13, 0-14, 0-15, 0-16, 0-17, 0-18, 0-19, 0-20, 
0-21, 0-22, 0-23, 0-24, 0-25, 0-26, 0-27, 0-28, 0-29, 0-30, 0-31) 
(kafka.producer.ZKBrokerPartitionInfo)
[2012-07-21 20:10:33,306] DEBUG Broker ids and # of partitions on each for 
topic: no_appkey = ArrayBuffer((0,1)) (kafka.producer.ZKBrokerPartitionInfo)
[2012-07-21 20:10:33,306] DEBUG Sorted list of broker ids and partition ids on 
each for topic: no_appkey = TreeSet(0-0) (kafka.producer.ZKBrokerPartitionInfo)
[2012-07-21 20:10:33,308] DEBUG Broker ids and # of partitions on each for 
topic: parse_exception = ArrayBuffer((0,32)) 
(kafka.producer.ZKBrokerPartitionInfo)
[2012-07-21 20:10:33,309] DEBUG Sorted list of broker ids and partition ids on 
each for topic: parse_exception = TreeSet(0-0, 0-1, 0-2, 0-3, 0-4, 0-5, 0-6, 
0-7, 0-8, 0-9, 0-10, 0-11, 0-12, 0-13, 0-14, 0-15, 0-16, 0-17, 0-18, 0-19, 
0-20, 0-21, 0-22, 0-23, 0-24, 0-25, 0-26, 0-27, 0-28, 0-29, 0-30, 0-31) 
(kafka.producer.ZKBrokerPartitionInfo)
[2012-07-21 20:10:33,315] DEBUG [BrokerTopicsListener] Creating broker topics 
listener to watch the following paths - 
/broker/topics, /broker/topics/topic, /broker/ids 
(kafka.producer.ZKBrokerPartitionInfo$BrokerTopicsListener)
[2012-07-21 20:10:33,318] DEBUG [BrokerTopicsListener] Initialized this broker 
topics listener with initial mapping of broker id to partition id per topic 
with Map(app_log -> TreeSet(0-0, 0-1, 0-2, 0-3, 0-4, 0-5, 0-6, 0-7, 0-8, 0-9, 
0-10, 0-11, 0-12, 0-13, 0-14, 0-15, 0-16, 0-17, 0-18, 0-19, 0-20, 0-21, 0-22, 
0-23, 0-24, 0-25, 0-26, 0-27, 0-28, 0-29, 0-30, 0-31), nrt-test-less -> 
TreeSet(0-0, 0-1, 0-2, 0-3, 0-4, 0-5, 0-6, 0-7, 0-8, 0-9, 0-10, 0-11, 0-12, 
0-13, 0-14, 0-15, 0-16, 0-17, 0-18, 0-19, 0-20, 0-21, 0-22, 0-23, 0-24, 0-25, 
0-26, 0-27, 0-28, 0-29, 0-30, 0-31), nrt-test -> TreeSet(0-0, 0-1, 0-2, 0-3, 
0-4, 0-5, 0-6, 0-7, 0-8, 0-9, 0-10, 0-11, 0-12, 0-13, 0-14, 0-15, 0-16, 0-17, 
0-18, 0-19, 0-20, 0-21, 0-22, 0-23, 0-24, 0-25, 0-26, 0-27, 0-28, 0-29, 0-30, 
0-31), install-test -> TreeSet(0-0, 0-1, 0-2, 0-3, 0-4, 0-5, 0-6, 0-7, 0-8, 
0-9, 0-10, 0-11, 0-12, 0-13, 0-14, 0-15, 0-16, 0-17, 0-18, 0-19, 0-20, 0-21, 
0-22, 0-23, 0-24, 0-25, 0-26, 0-27, 0-28, 0-29, 0-30, 0-31), no_appkey -> 
TreeSet(0-0), parse_exception -> TreeSet(0-0, 0-1, 0-2, 0-3, 0-4, 0-5, 0-6, 
0-7, 0-8, 0-9, 0-10, 0-11, 0-12, 0-13, 0-14, 0-15, 0-16, 0-17, 0-18, 0-19, 
0-20, 0-21, 0-22, 0-23, 0-24, 0-25, 0-26, 0-27, 0-28, 0-29, 0-30, 0-31)) 
(kafka.producer.ZKBrokerPartitionInfo$BrokerTopicsListener)
[2012-07-21 20:10:33,328] DEBUG Registering listener on path: 
/brokers/topics/app_log (kafka.producer.ZKBrokerPartitionInfo)
[2012-07-21 20:10:33,329] DEBUG Registering listener on path: 
/brokers/topics/nrt-test-less (kafka.producer.ZKBrokerPartitionInfo)
[2012-07-21 20:10:33,331] DEBUG Registering listener on path: 
/brokers/topics/nrt-test (kafka.producer.ZKBrokerPartitionInfo)
[2012-07-21 20:10:33,332] DEBUG Registering listener on path: 
/brokers/topics/no_appkey (kafka.producer.ZKBrokerPartitionInfo)
[2012-07-21 20:10:33,333] DEBUG Registering listener on path: 
/brokers/topics/install-test (kafka.producer.ZKBrokerPartitionInfo)
[2012-07-21 20:10:33,334] DEBUG Registering listener on path: 
/brokers/topics/parse_exception (kafka.producer.ZKBrokerPartitionInfo)
[2012-07-21 20:10:33,342] TRACE Instantiating Scala Sync Producer 
(kafka.producer.SyncProducer)
[2012-07-21 20:10:33,357] INFO Creating async producer for broker id = 1 at 
XX.XX.XX.229:9092 (kafka.producer.ProducerPool)
[2012-07-21 20:10:33,357] TRACE Instantiating Scala Sync Producer 
(kafka.producer.SyncProducer)
[2012-07-21 20:10:33,359] INFO Creating async producer for broker id = 0 at 
XX.XX.XX.233:9092 (kafka.producer.ProducerPool)
[2012-07-21 20:10:38,366] DEBUG 5007 ms elapsed. Queue time reached. Sending.. 
(kafka.producer.async.ProducerSendThread)
[2012-07-21 20:10:38,366] DEBUG 5008 ms elapsed. Queue time reached. Sending.. 
(kafka.producer.async.ProducerSendThread)
[2012-07-21 20:10:38,367] DEBUG Handling 0 events 
(kafka.producer.async.ProducerSendThread)
[2012-07-21 20:10:38,367] DEBUG Handling 0 events 
(kafka.producer.async.ProducerSendThread)
[2012-07-21 20:10:43,367] DEBUG 5000 ms elapsed. Queue time reached. Sending.. 
(kafka.producer.async.ProducerSendThread)
[2012-07-21 20:10:43,367] DEBUG 5000 ms elapsed. Queue time reached. Sending.. 
(kafka.producer.async.ProducerSendThread)
[2012-07-21 20:10:43,367] DEBUG Handling 0 events 
(kafka.producer.async.ProducerSendThread)
[2012-07-21 20:10:43,368] DEBUG Handling 0 events 
(kafka.producer.async.ProducerSendThread)
...
...
...






And producer log after sending some data:

[2012-07-21 20:12:43,382] DEBUG Handling 0 events 
(kafka.producer.async.ProducerSendThread)
[2012-07-21 20:12:48,382] DEBUG 5000 ms elapsed. Queue time reached. Sending.. 
(kafka.producer.async.ProducerSendThread)
[2012-07-21 20:12:48,382] DEBUG Handling 0 events 
(kafka.producer.async.ProducerSendThread)
[2012-07-21 20:12:48,382] DEBUG 5000 ms elapsed. Queue time reached. Sending.. 
(kafka.producer.async.ProducerSendThread)
[2012-07-21 20:12:48,383] DEBUG Handling 0 events 
(kafka.producer.async.ProducerSendThread)
[2012-07-21 20:12:48,672] DEBUG Getting the number of broker partitions 
registered for topic: app_log (kafka.producer.Producer)
[2012-07-21 20:12:48,675] DEBUG Broker partitions registered for topic: app_log 
= ArrayBuffer(0-0, 0-1, 0-2, 0-3, 0-4, 0-5, 0-6, 0-7, 0-8, 0-9, 0-10, 0-11, 
0-12, 0-13, 0-14, 0-15, 0-16, 0-17, 0-18, 0-19, 0-20, 0-21, 0-22, 0-23, 0-24, 
0-25, 0-26, 0-27, 0-28, 0-29, 0-30, 0-31) (kafka.producer.Producer)
[2012-07-21 20:12:48,675] DEBUG Sending message to broker XX.XX.XX.233:9092 on 
partition 26 (kafka.producer.Producer)
[2012-07-21 20:12:48,679] DEBUG Fetching async producer for broker id: 0 
(kafka.producer.ProducerPool)
[2012-07-21 20:12:48,683] TRACE Added event to send queue for topic: app_log, 
partition: 26:{"os":"Android","access_subtype":"EDGE",...,"tag":"Sync"} 
(kafka.producer.async.AsyncProducer)
[2012-07-21 20:12:48,683] TRACE Remaining queue size: 10000 
(kafka.producer.async.AsyncProducer)
[2012-07-21 20:12:48,683] DEBUG Sending compressed messages 
(kafka.producer.ProducerPool)
[2012-07-21 20:12:48,683] TRACE Dequeued item for topic app_log and partition 
26 (kafka.producer.async.ProducerSendThread)
[2012-07-21 20:12:53,383] DEBUG 5000 ms elapsed. Queue time reached. Sending.. 
(kafka.producer.async.ProducerSendThread)
[2012-07-21 20:12:53,383] DEBUG Handling 0 events 
(kafka.producer.async.ProducerSendThread)
[2012-07-21 20:12:53,383] DEBUG 5000 ms elapsed. Queue time reached. Sending.. 
(kafka.producer.async.ProducerSendThread)
[2012-07-21 20:12:53,383] DEBUG Handling 1 events 
(kafka.producer.async.ProducerSendThread)
[2012-07-21 20:12:53,384] TRACE Handling event for Topic: app_log, Partition: 
26 (kafka.producer.async.DefaultEventHandler)
[2012-07-21 20:12:53,391] TRACE Sending 1 messages with compression codec 1 to 
topic app_log on partition 26 (kafka.producer.async.DefaultEventHandler)
[2012-07-21 20:12:53,399] DEBUG Allocating message byte buffer of size = 696 
(kafka.message.CompressionUtils$)
[2012-07-21 20:12:53,407] TRACE Remaining bytes in iterator = 476 
(kafka.message.ByteBufferMessageSet)
[2012-07-21 20:12:53,407] TRACE size of data = 476 
(kafka.message.ByteBufferMessageSet)
[2012-07-21 20:12:53,408] TRACE shallow iterator currValidBytes = 480 
(kafka.message.ByteBufferMessageSet)
[2012-07-21 20:12:53,409] TRACE Got multi message sets with 480 bytes to send 
(kafka.producer.SyncProducer)
[2012-07-21 20:12:53,412] TRACE verifying sendbuffer of size 501 
(kafka.producer.SyncProducer)
[2012-07-21 20:12:53,414] DEBUG makeNext() in internalIterator: innerDone = 
true (kafka.message.ByteBufferMessageSet)
[2012-07-21 20:12:53,414] TRACE Remaining bytes in iterator = 476 
(kafka.message.ByteBufferMessageSet)
[2012-07-21 20:12:53,414] TRACE size of data = 476 
(kafka.message.ByteBufferMessageSet)
[2012-07-21 20:12:53,414] DEBUG Message is compressed. Valid byte count = 0 
(kafka.message.ByteBufferMessageSet)
[2012-07-21 20:12:53,421] DEBUG makeNext() in internalIterator: innerDone = 
true (kafka.message.ByteBufferMessageSet)
[2012-07-21 20:12:53,421] TRACE Remaining bytes in iterator = 692 
(kafka.message.ByteBufferMessageSet)
[2012-07-21 20:12:53,421] TRACE size of data = 692 
(kafka.message.ByteBufferMessageSet)
[2012-07-21 20:12:53,422] DEBUG Message is uncompressed. Valid byte count = 0 
(kafka.message.ByteBufferMessageSet)
[2012-07-21 20:12:53,422] TRACE currValidBytes = 696 
(kafka.message.ByteBufferMessageSet)
[2012-07-21 20:12:53,422] DEBUG makeNext() in internalIterator: innerDone = 
false (kafka.message.ByteBufferMessageSet)
[2012-07-21 20:12:53,422] DEBUG makeNext() in internalIterator: innerDone = 
true (kafka.message.ByteBufferMessageSet)
[2012-07-21 20:12:53,423] DEBUG makeNext() in internalIterator: innerDone = 
true (kafka.message.ByteBufferMessageSet)
[2012-07-21 20:12:53,424] INFO Connected to XX.XX.XX.233:9092 for producing 
(kafka.producer.SyncProducer)
[2012-07-21 20:12:53,425] TRACE 505 bytes written. 
(kafka.network.BoundedByteBufferSend)
[2012-07-21 20:12:53,430] DEBUG makeNext() in internalIterator: innerDone = 
true (kafka.message.ByteBufferMessageSet)
[2012-07-21 20:12:53,430] TRACE Remaining bytes in iterator = 476 
(kafka.message.ByteBufferMessageSet)
[2012-07-21 20:12:53,431] TRACE size of data = 476 
(kafka.message.ByteBufferMessageSet)
[2012-07-21 20:12:53,431] DEBUG Message is compressed. Valid byte count = 0 
(kafka.message.ByteBufferMessageSet)
[2012-07-21 20:12:53,431] DEBUG makeNext() in internalIterator: innerDone = 
true (kafka.message.ByteBufferMessageSet)
[2012-07-21 20:12:53,431] TRACE Remaining bytes in iterator = 692 
(kafka.message.ByteBufferMessageSet)
[2012-07-21 20:12:53,431] TRACE size of data = 692 
(kafka.message.ByteBufferMessageSet)
[2012-07-21 20:12:53,431] DEBUG Message is uncompressed. Valid byte count = 0 
(kafka.message.ByteBufferMessageSet)
[2012-07-21 20:12:53,432] TRACE currValidBytes = 696 
(kafka.message.ByteBufferMessageSet)
[2012-07-21 20:12:53,432] DEBUG makeNext() in internalIterator: innerDone = 
false (kafka.message.ByteBufferMessageSet)
[2012-07-21 20:12:53,433] DEBUG makeNext() in internalIterator: innerDone = 
true (kafka.message.ByteBufferMessageSet)
[2012-07-21 20:12:53,435] DEBUG makeNext() in internalIterator: innerDone = 
true (kafka.message.ByteBufferMessageSet)
[2012-07-21 20:12:53,435] TRACE kafka producer sent messages for topics 
Map((app_log,26) -> ByteBufferMessageSet(MessageAndOffset(message(magic = 1, 
attributes = 0, crc = 296392111, payload = java.nio.HeapByteBuffer[pos=0 
lim=686 cap=686]),480), )) to broker XX.XX.XX.233:9092 (on attempt 1) 
(kafka.producer.async.DefaultEventHandler)
[2012-07-21 20:12:58,383] DEBUG 5000 ms elapsed. Queue time reached. Sending.. 
(kafka.producer.async.ProducerSendThread)
[2012-07-21 20:12:58,383] DEBUG Handling 0 events 
(kafka.producer.async.ProducerSendThread)
[2012-07-21 20:12:58,435] DEBUG 5000 ms elapsed. Queue time reached. Sending.. 
(kafka.producer.async.ProducerSendThread)
[2012-07-21 20:12:58,436] DEBUG Handling 0 events 
(kafka.producer.async.ProducerSendThread)
[2012-07-21 20:12:59,427] DEBUG Getting the number of broker partitions 
registered for topic: app_log (kafka.producer.Producer)
[2012-07-21 20:12:59,428] DEBUG Broker partitions registered for topic: app_log 
= ArrayBuffer(0-0, 0-1, 0-2, 0-3, 0-4, 0-5, 0-6, 0-7, 0-8, 0-9, 0-10, 0-11, 
0-12, 0-13, 0-14, 0-15, 0-16, 0-17, 0-18, 0-19, 0-20, 0-21, 0-22, 0-23, 0-24, 
0-25, 0-26, 0-27, 0-28, 0-29, 0-30, 0-31) (kafka.producer.Producer)
[2012-07-21 20:12:59,428] DEBUG Sending message to broker XX.XX.XX.233:9092 on 
partition 26 (kafka.producer.Producer)
[2012-07-21 20:12:59,428] DEBUG Fetching async producer for broker id: 0 
(kafka.producer.ProducerPool)
[2012-07-21 20:12:59,429] TRACE Dequeued item for topic app_log and partition 
26 (kafka.producer.async.ProducerSendThread)
[2012-07-21 20:12:59,429] TRACE Added event to send queue for topic: app_log, 
partition: 26:{"os":"Android","access_subtype":"EDGE",...,"city":""} 
(kafka.producer.async.AsyncProducer)
[2012-07-21 20:12:59,429] TRACE Remaining queue size: 10000 
(kafka.producer.async.AsyncProducer)
[2012-07-21 20:12:59,429] DEBUG Sending compressed messages 
(kafka.producer.ProducerPool)
[2012-07-21 20:13:03,384] DEBUG 5001 ms elapsed. Queue time reached. Sending.. 
(kafka.producer.async.ProducerSendThread)
[2012-07-21 20:13:03,384] DEBUG Handling 0 events 
(kafka.producer.async.ProducerSendThread)
[2012-07-21 20:13:03,436] DEBUG 5000 ms elapsed. Queue time reached. Sending.. 
(kafka.producer.async.ProducerSendThread)
[2012-07-21 20:13:03,436] DEBUG Handling 1 events 
(kafka.producer.async.ProducerSendThread)
[2012-07-21 20:13:03,436] TRACE Handling event for Topic: app_log, Partition: 
26 (kafka.producer.async.DefaultEventHandler)
[2012-07-21 20:13:03,437] TRACE Sending 1 messages with compression codec 1 to 
topic app_log on partition 26 (kafka.producer.async.DefaultEventHandler)
[2012-07-21 20:13:03,437] DEBUG Allocating message byte buffer of size = 725 
(kafka.message.CompressionUtils$)
[2012-07-21 20:13:03,437] TRACE Remaining bytes in iterator = 468 
(kafka.message.ByteBufferMessageSet)
[2012-07-21 20:13:03,437] TRACE size of data = 468 
(kafka.message.ByteBufferMessageSet)
[2012-07-21 20:13:03,437] TRACE shallow iterator currValidBytes = 472 
(kafka.message.ByteBufferMessageSet)
[2012-07-21 20:13:03,438] TRACE Got multi message sets with 472 bytes to send 
(kafka.producer.SyncProducer)
[2012-07-21 20:13:03,438] TRACE verifying sendbuffer of size 493 
(kafka.producer.SyncProducer)
[2012-07-21 20:13:03,438] DEBUG makeNext() in internalIterator: innerDone = 
true (kafka.message.ByteBufferMessageSet)
[2012-07-21 20:13:03,438] TRACE Remaining bytes in iterator = 468 
(kafka.message.ByteBufferMessageSet)
[2012-07-21 20:13:03,438] TRACE size of data = 468 
(kafka.message.ByteBufferMessageSet)
[2012-07-21 20:13:03,438] DEBUG Message is compressed. Valid byte count = 0 
(kafka.message.ByteBufferMessageSet)
[2012-07-21 20:13:03,438] DEBUG makeNext() in internalIterator: innerDone = 
true (kafka.message.ByteBufferMessageSet)
[2012-07-21 20:13:03,439] TRACE Remaining bytes in iterator = 721 
(kafka.message.ByteBufferMessageSet)
[2012-07-21 20:13:03,439] TRACE size of data = 721 
(kafka.message.ByteBufferMessageSet)
[2012-07-21 20:13:03,439] DEBUG Message is uncompressed. Valid byte count = 0 
(kafka.message.ByteBufferMessageSet)
[2012-07-21 20:13:03,439] TRACE currValidBytes = 725 
(kafka.message.ByteBufferMessageSet)
[2012-07-21 20:13:03,439] DEBUG makeNext() in internalIterator: innerDone = 
false (kafka.message.ByteBufferMessageSet)
[2012-07-21 20:13:03,439] DEBUG makeNext() in internalIterator: innerDone = 
true (kafka.message.ByteBufferMessageSet)
[2012-07-21 20:13:03,439] DEBUG makeNext() in internalIterator: innerDone = 
true (kafka.message.ByteBufferMessageSet)
[2012-07-21 20:13:03,439] TRACE 497 bytes written. 
(kafka.network.BoundedByteBufferSend)
[2012-07-21 20:13:03,440] DEBUG makeNext() in internalIterator: innerDone = 
true (kafka.message.ByteBufferMessageSet)
[2012-07-21 20:13:03,440] TRACE Remaining bytes in iterator = 468 
(kafka.message.ByteBufferMessageSet)
[2012-07-21 20:13:03,440] TRACE size of data = 468 
(kafka.message.ByteBufferMessageSet)
[2012-07-21 20:13:03,440] DEBUG Message is compressed. Valid byte count = 0 
(kafka.message.ByteBufferMessageSet)
[2012-07-21 20:13:03,440] DEBUG makeNext() in internalIterator: innerDone = 
true (kafka.message.ByteBufferMessageSet)
[2012-07-21 20:13:03,440] TRACE Remaining bytes in iterator = 721 
(kafka.message.ByteBufferMessageSet)
[2012-07-21 20:13:03,441] TRACE size of data = 721 
(kafka.message.ByteBufferMessageSet)
[2012-07-21 20:13:03,441] DEBUG Message is uncompressed. Valid byte count = 0 
(kafka.message.ByteBufferMessageSet)
[2012-07-21 20:13:03,441] TRACE currValidBytes = 725 
(kafka.message.ByteBufferMessageSet)
[2012-07-21 20:13:03,441] DEBUG makeNext() in internalIterator: innerDone = 
false (kafka.message.ByteBufferMessageSet)
[2012-07-21 20:13:03,441] DEBUG makeNext() in internalIterator: innerDone = 
true (kafka.message.ByteBufferMessageSet)
[2012-07-21 20:13:03,441] DEBUG makeNext() in internalIterator: innerDone = 
true (kafka.message.ByteBufferMessageSet)
[2012-07-21 20:13:03,441] TRACE kafka producer sent messages for topics 
Map((app_log,26) -> ByteBufferMessageSet(MessageAndOffset(message(magic = 1, 
attributes = 0, crc = 1163603178, payload = java.nio.HeapByteBuffer[pos=0 
lim=715 cap=715]),472), )) to broker XX.XX.XX.233:9092 (on attempt 1) 
(kafka.producer.async.DefaultEventHandler)
[2012-07-21 20:13:08,385] DEBUG 5000 ms elapsed. Queue time reached. Sending.. 
(kafka.producer.async.ProducerSendThread)
[2012-07-21 20:13:08,385] DEBUG Handling 0 events 
(kafka.producer.async.ProducerSendThread)
[2012-07-21 20:13:08,442] DEBUG 5001 ms elapsed. Queue time reached. Sending.. 
(kafka.producer.async.ProducerSendThread)
[2012-07-21 20:13:08,442] DEBUG Handling 0 events 
(kafka.producer.async.ProducerSendThread)
  



--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira


Reply via email to