[jira] [Commented] (KAFKA-961) state.change.logger: Error on broker 1 while processing LeaderAndIsr request correlationId 6 received from controller 1 epoch 1 for partition (page_visits,0)

2013-07-07 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13701753#comment-13701753
 ] 

Jun Rao commented on KAFKA-961:
---

You can embed KafkaServerStartable instead. It just wraps KafkaServer.

 state.change.logger: Error on broker 1 while processing LeaderAndIsr request 
 correlationId 6 received from controller 1 epoch 1 for partition 
 (page_visits,0)
 -

 Key: KAFKA-961
 URL: https://issues.apache.org/jira/browse/KAFKA-961
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
 Environment: Linux gman-minty 3.8.0-19-generic #29-Ubuntu SMP Wed Apr 
 17 18:16:28 UTC 2013 x86_64 x86_64 x86_64 GNU/Linux
Reporter: Garrett Barton

 Been having issues embedding 0.8 servers into some Yarn stuff I'm doing. I 
 just pulled the latest from git, did a ./sbt +package, followed by ./sbt 
 assembly-package-dependency. And pushed 
 core/target/scala-2.8.0/kafka_2.8.0-0.8.0-beta1.jar into my local mvn repo.  
 Here is sample code ripped out to little classes to show my error:
 Starting up a broker embedded in java, with the following code:
 ...
   Properties props = new Properties();
   // dont set so it binds to all interfaces
   // props.setProperty(hostname, hostName);
   props.setProperty(port, );
   props.setProperty(broker.id, 1);
   props.setProperty(log.dir, /tmp/embeddedkafka/ + 
 randId);
   // TODO: hardcode bad
   props.setProperty(zookeeper.connect, 
 localhost:2181/ + randId);
   KafkaConfig kconf = new KafkaConfig(props);
   
   server = new KafkaServer(kconf, null);
   server.startup();
   LOG.info(Broker online);
 Sample Producer has the following code:
 ...
   Properties props = new Properties();
   props.put(metadata.broker.list, gman-minty:);
   props.put(serializer.class, kafka.serializer.StringEncoder);
   props.put(partitioner.class, 
 com.gman.broker.SimplePartitioner);
   props.put(request.required.acks, 1);
   ProducerConfig config = new ProducerConfig(props);
   
   ProducerString, String producer = new ProducerString, 
 String(config);
   LOG.info(producer created);
   KeyedMessageString, String data = new KeyedMessageString, 
 String(page_visits, key1, value1);
   producer.send(data);
   LOG.info(wrote message:  + data);
 And here is the server log:
 INFO  2013-07-03 13:47:30,538 [Thread-0] kafka.utils.VerifiableProperties: 
 Verifying properties
 INFO  2013-07-03 13:47:30,568 [Thread-0] kafka.utils.VerifiableProperties: 
 Property port is overridden to 
 INFO  2013-07-03 13:47:30,568 [Thread-0] kafka.utils.VerifiableProperties: 
 Property broker.id is overridden to 1
 INFO  2013-07-03 13:47:30,568 [Thread-0] kafka.utils.VerifiableProperties: 
 Property zookeeper.connect is overridden to localhost:2181/kafkatest
 INFO  2013-07-03 13:47:30,569 [Thread-0] kafka.utils.VerifiableProperties: 
 Property log.dir is overridden to \tmp\embeddedkafka\1372873650268
 INFO  2013-07-03 13:47:30,574 [Thread-0] kafka.server.KafkaServer: [Kafka 
 Server 1], Starting
 INFO  2013-07-03 13:47:30,609 [Thread-0] kafka.log.LogManager: [Log Manager 
 on Broker 1] Log directory 
 '/home/gman/workspace/distributed_parser/\tmp\embeddedkafka\1372873650268' 
 not found, creating it.
 INFO  2013-07-03 13:47:30,619 [Thread-0] kafka.log.LogManager: [Log Manager 
 on Broker 1] Starting log cleaner every 60 ms
 INFO  2013-07-03 13:47:30,630 [Thread-0] kafka.log.LogManager: [Log Manager 
 on Broker 1] Starting log flusher every 3000 ms with the following overrides 
 Map()
 INFO  2013-07-03 13:47:30,687 [Thread-0] kafka.network.Acceptor: Awaiting 
 socket connections on 0.0.0.0:.
 INFO  2013-07-03 13:47:30,688 [Thread-0] kafka.network.SocketServer: [Socket 
 Server on Broker 1], Started
 INFO  2013-07-03 13:47:30,696 [Thread-0] kafka.server.KafkaZooKeeper: 
 connecting to ZK: localhost:2181/kafkatest
 INFO  2013-07-03 13:47:30,707 
 [ZkClient-EventThread-17-localhost:2181/kafkatest] 
 org.I0Itec.zkclient.ZkEventThread: Starting ZkClient event thread.
 INFO  2013-07-03 13:47:30,716 [Thread-0] org.apache.zookeeper.ZooKeeper: 
 Client environment:zookeeper.version=3.4.2-1221870, built on 12/21/2011 20:46 
 GMT
 INFO  2013-07-03 13:47:30,717 [Thread-0] org.apache.zookeeper.ZooKeeper: 
 Client environment:host.name=gman-minty
 INFO  2013-07-03 13:47:30,717 [Thread-0] 

[jira] [Commented] (KAFKA-961) state.change.logger: Error on broker 1 while processing LeaderAndIsr request correlationId 6 received from controller 1 epoch 1 for partition (page_visits,0)

2013-07-03 Thread Garrett Barton (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13699489#comment-13699489
 ] 

Garrett Barton commented on KAFKA-961:
--

I found out what is going on.  Since I'm doing this in java (excuse my lack of 
scala knowledge here) I had to set the KafkaServer constructor with a value for 
the Time object.  Since Apparently SystemTime does not convert to the Time 
interface/trait in java I left it as null thinking that the scala bit in the 
constructor: 
time: Time = SystemTime
Would init one for me.  Well, it doesn't.  I made an inner class of Time that 
had SystemTime's functionality and passed that into the constructor, and now I 
can get passed my npe error.

Is this expected behavior?

 state.change.logger: Error on broker 1 while processing LeaderAndIsr request 
 correlationId 6 received from controller 1 epoch 1 for partition 
 (page_visits,0)
 -

 Key: KAFKA-961
 URL: https://issues.apache.org/jira/browse/KAFKA-961
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
 Environment: Linux gman-minty 3.8.0-19-generic #29-Ubuntu SMP Wed Apr 
 17 18:16:28 UTC 2013 x86_64 x86_64 x86_64 GNU/Linux
Reporter: Garrett Barton

 Been having issues embedding 0.8 servers into some Yarn stuff I'm doing. I 
 just pulled the latest from git, did a ./sbt +package, followed by ./sbt 
 assembly-package-dependency. And pushed 
 core/target/scala-2.8.0/kafka_2.8.0-0.8.0-beta1.jar into my local mvn repo.  
 Here is sample code ripped out to little classes to show my error:
 Starting up a broker embedded in java, with the following code:
 ...
   Properties props = new Properties();
   // dont set so it binds to all interfaces
   // props.setProperty(hostname, hostName);
   props.setProperty(port, );
   props.setProperty(broker.id, 1);
   props.setProperty(log.dir, /tmp/embeddedkafka/ + 
 randId);
   // TODO: hardcode bad
   props.setProperty(zookeeper.connect, 
 localhost:2181/ + randId);
   KafkaConfig kconf = new KafkaConfig(props);
   
   server = new KafkaServer(kconf, null);
   server.startup();
   LOG.info(Broker online);
 Sample Producer has the following code:
 ...
   Properties props = new Properties();
   props.put(metadata.broker.list, gman-minty:);
   props.put(serializer.class, kafka.serializer.StringEncoder);
   props.put(partitioner.class, 
 com.gman.broker.SimplePartitioner);
   props.put(request.required.acks, 1);
   ProducerConfig config = new ProducerConfig(props);
   
   ProducerString, String producer = new ProducerString, 
 String(config);
   LOG.info(producer created);
   KeyedMessageString, String data = new KeyedMessageString, 
 String(page_visits, key1, value1);
   producer.send(data);
   LOG.info(wrote message:  + data);
 And here is the server log:
 INFO  2013-07-03 13:47:30,538 [Thread-0] kafka.utils.VerifiableProperties: 
 Verifying properties
 INFO  2013-07-03 13:47:30,568 [Thread-0] kafka.utils.VerifiableProperties: 
 Property port is overridden to 
 INFO  2013-07-03 13:47:30,568 [Thread-0] kafka.utils.VerifiableProperties: 
 Property broker.id is overridden to 1
 INFO  2013-07-03 13:47:30,568 [Thread-0] kafka.utils.VerifiableProperties: 
 Property zookeeper.connect is overridden to localhost:2181/kafkatest
 INFO  2013-07-03 13:47:30,569 [Thread-0] kafka.utils.VerifiableProperties: 
 Property log.dir is overridden to \tmp\embeddedkafka\1372873650268
 INFO  2013-07-03 13:47:30,574 [Thread-0] kafka.server.KafkaServer: [Kafka 
 Server 1], Starting
 INFO  2013-07-03 13:47:30,609 [Thread-0] kafka.log.LogManager: [Log Manager 
 on Broker 1] Log directory 
 '/home/gman/workspace/distributed_parser/\tmp\embeddedkafka\1372873650268' 
 not found, creating it.
 INFO  2013-07-03 13:47:30,619 [Thread-0] kafka.log.LogManager: [Log Manager 
 on Broker 1] Starting log cleaner every 60 ms
 INFO  2013-07-03 13:47:30,630 [Thread-0] kafka.log.LogManager: [Log Manager 
 on Broker 1] Starting log flusher every 3000 ms with the following overrides 
 Map()
 INFO  2013-07-03 13:47:30,687 [Thread-0] kafka.network.Acceptor: Awaiting 
 socket connections on 0.0.0.0:.
 INFO  2013-07-03 13:47:30,688 [Thread-0] kafka.network.SocketServer: [Socket 
 Server on Broker 1], Started
 INFO  2013-07-03 13:47:30,696 [Thread-0] kafka.server.KafkaZooKeeper: 
 

[jira] [Commented] (KAFKA-961) state.change.logger: Error on broker 1 while processing LeaderAndIsr request correlationId 6 received from controller 1 epoch 1 for partition (page_visits,0)

2013-07-03 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13699567#comment-13699567
 ] 

Joel Koshy commented on KAFKA-961:
--

Passing in null for time would definitely lead to that NPE as you found. I 
think we only needed a time interface to support a mocktime for tests. Also, we 
probably didn't anticipate that KafkaServer's would need to be embedded in Java 
code. If you are okay with your work-around, then great. Another (ugly) way to 
do it would be to pass in a dynamically instantiated SystemTime - so something 
like (Time) Class.forName(SystemTime.class.getName()).newInstance() - not sure 
if it will work though. We can also provide an explicit constructor without the 
time argument and get rid of the scala default arg.


 state.change.logger: Error on broker 1 while processing LeaderAndIsr request 
 correlationId 6 received from controller 1 epoch 1 for partition 
 (page_visits,0)
 -

 Key: KAFKA-961
 URL: https://issues.apache.org/jira/browse/KAFKA-961
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
 Environment: Linux gman-minty 3.8.0-19-generic #29-Ubuntu SMP Wed Apr 
 17 18:16:28 UTC 2013 x86_64 x86_64 x86_64 GNU/Linux
Reporter: Garrett Barton

 Been having issues embedding 0.8 servers into some Yarn stuff I'm doing. I 
 just pulled the latest from git, did a ./sbt +package, followed by ./sbt 
 assembly-package-dependency. And pushed 
 core/target/scala-2.8.0/kafka_2.8.0-0.8.0-beta1.jar into my local mvn repo.  
 Here is sample code ripped out to little classes to show my error:
 Starting up a broker embedded in java, with the following code:
 ...
   Properties props = new Properties();
   // dont set so it binds to all interfaces
   // props.setProperty(hostname, hostName);
   props.setProperty(port, );
   props.setProperty(broker.id, 1);
   props.setProperty(log.dir, /tmp/embeddedkafka/ + 
 randId);
   // TODO: hardcode bad
   props.setProperty(zookeeper.connect, 
 localhost:2181/ + randId);
   KafkaConfig kconf = new KafkaConfig(props);
   
   server = new KafkaServer(kconf, null);
   server.startup();
   LOG.info(Broker online);
 Sample Producer has the following code:
 ...
   Properties props = new Properties();
   props.put(metadata.broker.list, gman-minty:);
   props.put(serializer.class, kafka.serializer.StringEncoder);
   props.put(partitioner.class, 
 com.gman.broker.SimplePartitioner);
   props.put(request.required.acks, 1);
   ProducerConfig config = new ProducerConfig(props);
   
   ProducerString, String producer = new ProducerString, 
 String(config);
   LOG.info(producer created);
   KeyedMessageString, String data = new KeyedMessageString, 
 String(page_visits, key1, value1);
   producer.send(data);
   LOG.info(wrote message:  + data);
 And here is the server log:
 INFO  2013-07-03 13:47:30,538 [Thread-0] kafka.utils.VerifiableProperties: 
 Verifying properties
 INFO  2013-07-03 13:47:30,568 [Thread-0] kafka.utils.VerifiableProperties: 
 Property port is overridden to 
 INFO  2013-07-03 13:47:30,568 [Thread-0] kafka.utils.VerifiableProperties: 
 Property broker.id is overridden to 1
 INFO  2013-07-03 13:47:30,568 [Thread-0] kafka.utils.VerifiableProperties: 
 Property zookeeper.connect is overridden to localhost:2181/kafkatest
 INFO  2013-07-03 13:47:30,569 [Thread-0] kafka.utils.VerifiableProperties: 
 Property log.dir is overridden to \tmp\embeddedkafka\1372873650268
 INFO  2013-07-03 13:47:30,574 [Thread-0] kafka.server.KafkaServer: [Kafka 
 Server 1], Starting
 INFO  2013-07-03 13:47:30,609 [Thread-0] kafka.log.LogManager: [Log Manager 
 on Broker 1] Log directory 
 '/home/gman/workspace/distributed_parser/\tmp\embeddedkafka\1372873650268' 
 not found, creating it.
 INFO  2013-07-03 13:47:30,619 [Thread-0] kafka.log.LogManager: [Log Manager 
 on Broker 1] Starting log cleaner every 60 ms
 INFO  2013-07-03 13:47:30,630 [Thread-0] kafka.log.LogManager: [Log Manager 
 on Broker 1] Starting log flusher every 3000 ms with the following overrides 
 Map()
 INFO  2013-07-03 13:47:30,687 [Thread-0] kafka.network.Acceptor: Awaiting 
 socket connections on 0.0.0.0:.
 INFO  2013-07-03 13:47:30,688 [Thread-0] kafka.network.SocketServer: [Socket 
 Server on Broker 1], Started
 INFO  2013-07-03 13:47:30,696 [Thread-0] 

[jira] [Commented] (KAFKA-961) state.change.logger: Error on broker 1 while processing LeaderAndIsr request correlationId 6 received from controller 1 epoch 1 for partition (page_visits,0)

2013-07-03 Thread Garrett Barton (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13699741#comment-13699741
 ] 

Garrett Barton commented on KAFKA-961:
--

Thanks for the reply! I personally lean towards the cleaner constructor as I 
think that logic should be defaulted in the class instead of every time I use 
it having to pass one in. Having said that, I am probably doing odd things with 
the embedding of Kafka so I understand if you think differently.

 state.change.logger: Error on broker 1 while processing LeaderAndIsr request 
 correlationId 6 received from controller 1 epoch 1 for partition 
 (page_visits,0)
 -

 Key: KAFKA-961
 URL: https://issues.apache.org/jira/browse/KAFKA-961
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
 Environment: Linux gman-minty 3.8.0-19-generic #29-Ubuntu SMP Wed Apr 
 17 18:16:28 UTC 2013 x86_64 x86_64 x86_64 GNU/Linux
Reporter: Garrett Barton

 Been having issues embedding 0.8 servers into some Yarn stuff I'm doing. I 
 just pulled the latest from git, did a ./sbt +package, followed by ./sbt 
 assembly-package-dependency. And pushed 
 core/target/scala-2.8.0/kafka_2.8.0-0.8.0-beta1.jar into my local mvn repo.  
 Here is sample code ripped out to little classes to show my error:
 Starting up a broker embedded in java, with the following code:
 ...
   Properties props = new Properties();
   // dont set so it binds to all interfaces
   // props.setProperty(hostname, hostName);
   props.setProperty(port, );
   props.setProperty(broker.id, 1);
   props.setProperty(log.dir, /tmp/embeddedkafka/ + 
 randId);
   // TODO: hardcode bad
   props.setProperty(zookeeper.connect, 
 localhost:2181/ + randId);
   KafkaConfig kconf = new KafkaConfig(props);
   
   server = new KafkaServer(kconf, null);
   server.startup();
   LOG.info(Broker online);
 Sample Producer has the following code:
 ...
   Properties props = new Properties();
   props.put(metadata.broker.list, gman-minty:);
   props.put(serializer.class, kafka.serializer.StringEncoder);
   props.put(partitioner.class, 
 com.gman.broker.SimplePartitioner);
   props.put(request.required.acks, 1);
   ProducerConfig config = new ProducerConfig(props);
   
   ProducerString, String producer = new ProducerString, 
 String(config);
   LOG.info(producer created);
   KeyedMessageString, String data = new KeyedMessageString, 
 String(page_visits, key1, value1);
   producer.send(data);
   LOG.info(wrote message:  + data);
 And here is the server log:
 INFO  2013-07-03 13:47:30,538 [Thread-0] kafka.utils.VerifiableProperties: 
 Verifying properties
 INFO  2013-07-03 13:47:30,568 [Thread-0] kafka.utils.VerifiableProperties: 
 Property port is overridden to 
 INFO  2013-07-03 13:47:30,568 [Thread-0] kafka.utils.VerifiableProperties: 
 Property broker.id is overridden to 1
 INFO  2013-07-03 13:47:30,568 [Thread-0] kafka.utils.VerifiableProperties: 
 Property zookeeper.connect is overridden to localhost:2181/kafkatest
 INFO  2013-07-03 13:47:30,569 [Thread-0] kafka.utils.VerifiableProperties: 
 Property log.dir is overridden to \tmp\embeddedkafka\1372873650268
 INFO  2013-07-03 13:47:30,574 [Thread-0] kafka.server.KafkaServer: [Kafka 
 Server 1], Starting
 INFO  2013-07-03 13:47:30,609 [Thread-0] kafka.log.LogManager: [Log Manager 
 on Broker 1] Log directory 
 '/home/gman/workspace/distributed_parser/\tmp\embeddedkafka\1372873650268' 
 not found, creating it.
 INFO  2013-07-03 13:47:30,619 [Thread-0] kafka.log.LogManager: [Log Manager 
 on Broker 1] Starting log cleaner every 60 ms
 INFO  2013-07-03 13:47:30,630 [Thread-0] kafka.log.LogManager: [Log Manager 
 on Broker 1] Starting log flusher every 3000 ms with the following overrides 
 Map()
 INFO  2013-07-03 13:47:30,687 [Thread-0] kafka.network.Acceptor: Awaiting 
 socket connections on 0.0.0.0:.
 INFO  2013-07-03 13:47:30,688 [Thread-0] kafka.network.SocketServer: [Socket 
 Server on Broker 1], Started
 INFO  2013-07-03 13:47:30,696 [Thread-0] kafka.server.KafkaZooKeeper: 
 connecting to ZK: localhost:2181/kafkatest
 INFO  2013-07-03 13:47:30,707 
 [ZkClient-EventThread-17-localhost:2181/kafkatest] 
 org.I0Itec.zkclient.ZkEventThread: Starting ZkClient event thread.
 INFO  2013-07-03 13:47:30,716 [Thread-0] org.apache.zookeeper.ZooKeeper: