[jira] [Commented] (KAFKA-961) state.change.logger: Error on broker 1 while processing LeaderAndIsr request correlationId 6 received from controller 1 epoch 1 for partition (page_visits,0)
[ https://issues.apache.org/jira/browse/KAFKA-961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13701753#comment-13701753 ] Jun Rao commented on KAFKA-961: --- You can embed KafkaServerStartable instead. It just wraps KafkaServer. state.change.logger: Error on broker 1 while processing LeaderAndIsr request correlationId 6 received from controller 1 epoch 1 for partition (page_visits,0) - Key: KAFKA-961 URL: https://issues.apache.org/jira/browse/KAFKA-961 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Environment: Linux gman-minty 3.8.0-19-generic #29-Ubuntu SMP Wed Apr 17 18:16:28 UTC 2013 x86_64 x86_64 x86_64 GNU/Linux Reporter: Garrett Barton Been having issues embedding 0.8 servers into some Yarn stuff I'm doing. I just pulled the latest from git, did a ./sbt +package, followed by ./sbt assembly-package-dependency. And pushed core/target/scala-2.8.0/kafka_2.8.0-0.8.0-beta1.jar into my local mvn repo. Here is sample code ripped out to little classes to show my error: Starting up a broker embedded in java, with the following code: ... Properties props = new Properties(); // dont set so it binds to all interfaces // props.setProperty(hostname, hostName); props.setProperty(port, ); props.setProperty(broker.id, 1); props.setProperty(log.dir, /tmp/embeddedkafka/ + randId); // TODO: hardcode bad props.setProperty(zookeeper.connect, localhost:2181/ + randId); KafkaConfig kconf = new KafkaConfig(props); server = new KafkaServer(kconf, null); server.startup(); LOG.info(Broker online); Sample Producer has the following code: ... Properties props = new Properties(); props.put(metadata.broker.list, gman-minty:); props.put(serializer.class, kafka.serializer.StringEncoder); props.put(partitioner.class, com.gman.broker.SimplePartitioner); props.put(request.required.acks, 1); ProducerConfig config = new ProducerConfig(props); ProducerString, String producer = new ProducerString, String(config); LOG.info(producer created); KeyedMessageString, String data = new KeyedMessageString, String(page_visits, key1, value1); producer.send(data); LOG.info(wrote message: + data); And here is the server log: INFO 2013-07-03 13:47:30,538 [Thread-0] kafka.utils.VerifiableProperties: Verifying properties INFO 2013-07-03 13:47:30,568 [Thread-0] kafka.utils.VerifiableProperties: Property port is overridden to INFO 2013-07-03 13:47:30,568 [Thread-0] kafka.utils.VerifiableProperties: Property broker.id is overridden to 1 INFO 2013-07-03 13:47:30,568 [Thread-0] kafka.utils.VerifiableProperties: Property zookeeper.connect is overridden to localhost:2181/kafkatest INFO 2013-07-03 13:47:30,569 [Thread-0] kafka.utils.VerifiableProperties: Property log.dir is overridden to \tmp\embeddedkafka\1372873650268 INFO 2013-07-03 13:47:30,574 [Thread-0] kafka.server.KafkaServer: [Kafka Server 1], Starting INFO 2013-07-03 13:47:30,609 [Thread-0] kafka.log.LogManager: [Log Manager on Broker 1] Log directory '/home/gman/workspace/distributed_parser/\tmp\embeddedkafka\1372873650268' not found, creating it. INFO 2013-07-03 13:47:30,619 [Thread-0] kafka.log.LogManager: [Log Manager on Broker 1] Starting log cleaner every 60 ms INFO 2013-07-03 13:47:30,630 [Thread-0] kafka.log.LogManager: [Log Manager on Broker 1] Starting log flusher every 3000 ms with the following overrides Map() INFO 2013-07-03 13:47:30,687 [Thread-0] kafka.network.Acceptor: Awaiting socket connections on 0.0.0.0:. INFO 2013-07-03 13:47:30,688 [Thread-0] kafka.network.SocketServer: [Socket Server on Broker 1], Started INFO 2013-07-03 13:47:30,696 [Thread-0] kafka.server.KafkaZooKeeper: connecting to ZK: localhost:2181/kafkatest INFO 2013-07-03 13:47:30,707 [ZkClient-EventThread-17-localhost:2181/kafkatest] org.I0Itec.zkclient.ZkEventThread: Starting ZkClient event thread. INFO 2013-07-03 13:47:30,716 [Thread-0] org.apache.zookeeper.ZooKeeper: Client environment:zookeeper.version=3.4.2-1221870, built on 12/21/2011 20:46 GMT INFO 2013-07-03 13:47:30,717 [Thread-0] org.apache.zookeeper.ZooKeeper: Client environment:host.name=gman-minty INFO 2013-07-03 13:47:30,717 [Thread-0]
[jira] [Commented] (KAFKA-961) state.change.logger: Error on broker 1 while processing LeaderAndIsr request correlationId 6 received from controller 1 epoch 1 for partition (page_visits,0)
[ https://issues.apache.org/jira/browse/KAFKA-961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13699489#comment-13699489 ] Garrett Barton commented on KAFKA-961: -- I found out what is going on. Since I'm doing this in java (excuse my lack of scala knowledge here) I had to set the KafkaServer constructor with a value for the Time object. Since Apparently SystemTime does not convert to the Time interface/trait in java I left it as null thinking that the scala bit in the constructor: time: Time = SystemTime Would init one for me. Well, it doesn't. I made an inner class of Time that had SystemTime's functionality and passed that into the constructor, and now I can get passed my npe error. Is this expected behavior? state.change.logger: Error on broker 1 while processing LeaderAndIsr request correlationId 6 received from controller 1 epoch 1 for partition (page_visits,0) - Key: KAFKA-961 URL: https://issues.apache.org/jira/browse/KAFKA-961 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Environment: Linux gman-minty 3.8.0-19-generic #29-Ubuntu SMP Wed Apr 17 18:16:28 UTC 2013 x86_64 x86_64 x86_64 GNU/Linux Reporter: Garrett Barton Been having issues embedding 0.8 servers into some Yarn stuff I'm doing. I just pulled the latest from git, did a ./sbt +package, followed by ./sbt assembly-package-dependency. And pushed core/target/scala-2.8.0/kafka_2.8.0-0.8.0-beta1.jar into my local mvn repo. Here is sample code ripped out to little classes to show my error: Starting up a broker embedded in java, with the following code: ... Properties props = new Properties(); // dont set so it binds to all interfaces // props.setProperty(hostname, hostName); props.setProperty(port, ); props.setProperty(broker.id, 1); props.setProperty(log.dir, /tmp/embeddedkafka/ + randId); // TODO: hardcode bad props.setProperty(zookeeper.connect, localhost:2181/ + randId); KafkaConfig kconf = new KafkaConfig(props); server = new KafkaServer(kconf, null); server.startup(); LOG.info(Broker online); Sample Producer has the following code: ... Properties props = new Properties(); props.put(metadata.broker.list, gman-minty:); props.put(serializer.class, kafka.serializer.StringEncoder); props.put(partitioner.class, com.gman.broker.SimplePartitioner); props.put(request.required.acks, 1); ProducerConfig config = new ProducerConfig(props); ProducerString, String producer = new ProducerString, String(config); LOG.info(producer created); KeyedMessageString, String data = new KeyedMessageString, String(page_visits, key1, value1); producer.send(data); LOG.info(wrote message: + data); And here is the server log: INFO 2013-07-03 13:47:30,538 [Thread-0] kafka.utils.VerifiableProperties: Verifying properties INFO 2013-07-03 13:47:30,568 [Thread-0] kafka.utils.VerifiableProperties: Property port is overridden to INFO 2013-07-03 13:47:30,568 [Thread-0] kafka.utils.VerifiableProperties: Property broker.id is overridden to 1 INFO 2013-07-03 13:47:30,568 [Thread-0] kafka.utils.VerifiableProperties: Property zookeeper.connect is overridden to localhost:2181/kafkatest INFO 2013-07-03 13:47:30,569 [Thread-0] kafka.utils.VerifiableProperties: Property log.dir is overridden to \tmp\embeddedkafka\1372873650268 INFO 2013-07-03 13:47:30,574 [Thread-0] kafka.server.KafkaServer: [Kafka Server 1], Starting INFO 2013-07-03 13:47:30,609 [Thread-0] kafka.log.LogManager: [Log Manager on Broker 1] Log directory '/home/gman/workspace/distributed_parser/\tmp\embeddedkafka\1372873650268' not found, creating it. INFO 2013-07-03 13:47:30,619 [Thread-0] kafka.log.LogManager: [Log Manager on Broker 1] Starting log cleaner every 60 ms INFO 2013-07-03 13:47:30,630 [Thread-0] kafka.log.LogManager: [Log Manager on Broker 1] Starting log flusher every 3000 ms with the following overrides Map() INFO 2013-07-03 13:47:30,687 [Thread-0] kafka.network.Acceptor: Awaiting socket connections on 0.0.0.0:. INFO 2013-07-03 13:47:30,688 [Thread-0] kafka.network.SocketServer: [Socket Server on Broker 1], Started INFO 2013-07-03 13:47:30,696 [Thread-0] kafka.server.KafkaZooKeeper:
[jira] [Commented] (KAFKA-961) state.change.logger: Error on broker 1 while processing LeaderAndIsr request correlationId 6 received from controller 1 epoch 1 for partition (page_visits,0)
[ https://issues.apache.org/jira/browse/KAFKA-961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13699567#comment-13699567 ] Joel Koshy commented on KAFKA-961: -- Passing in null for time would definitely lead to that NPE as you found. I think we only needed a time interface to support a mocktime for tests. Also, we probably didn't anticipate that KafkaServer's would need to be embedded in Java code. If you are okay with your work-around, then great. Another (ugly) way to do it would be to pass in a dynamically instantiated SystemTime - so something like (Time) Class.forName(SystemTime.class.getName()).newInstance() - not sure if it will work though. We can also provide an explicit constructor without the time argument and get rid of the scala default arg. state.change.logger: Error on broker 1 while processing LeaderAndIsr request correlationId 6 received from controller 1 epoch 1 for partition (page_visits,0) - Key: KAFKA-961 URL: https://issues.apache.org/jira/browse/KAFKA-961 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Environment: Linux gman-minty 3.8.0-19-generic #29-Ubuntu SMP Wed Apr 17 18:16:28 UTC 2013 x86_64 x86_64 x86_64 GNU/Linux Reporter: Garrett Barton Been having issues embedding 0.8 servers into some Yarn stuff I'm doing. I just pulled the latest from git, did a ./sbt +package, followed by ./sbt assembly-package-dependency. And pushed core/target/scala-2.8.0/kafka_2.8.0-0.8.0-beta1.jar into my local mvn repo. Here is sample code ripped out to little classes to show my error: Starting up a broker embedded in java, with the following code: ... Properties props = new Properties(); // dont set so it binds to all interfaces // props.setProperty(hostname, hostName); props.setProperty(port, ); props.setProperty(broker.id, 1); props.setProperty(log.dir, /tmp/embeddedkafka/ + randId); // TODO: hardcode bad props.setProperty(zookeeper.connect, localhost:2181/ + randId); KafkaConfig kconf = new KafkaConfig(props); server = new KafkaServer(kconf, null); server.startup(); LOG.info(Broker online); Sample Producer has the following code: ... Properties props = new Properties(); props.put(metadata.broker.list, gman-minty:); props.put(serializer.class, kafka.serializer.StringEncoder); props.put(partitioner.class, com.gman.broker.SimplePartitioner); props.put(request.required.acks, 1); ProducerConfig config = new ProducerConfig(props); ProducerString, String producer = new ProducerString, String(config); LOG.info(producer created); KeyedMessageString, String data = new KeyedMessageString, String(page_visits, key1, value1); producer.send(data); LOG.info(wrote message: + data); And here is the server log: INFO 2013-07-03 13:47:30,538 [Thread-0] kafka.utils.VerifiableProperties: Verifying properties INFO 2013-07-03 13:47:30,568 [Thread-0] kafka.utils.VerifiableProperties: Property port is overridden to INFO 2013-07-03 13:47:30,568 [Thread-0] kafka.utils.VerifiableProperties: Property broker.id is overridden to 1 INFO 2013-07-03 13:47:30,568 [Thread-0] kafka.utils.VerifiableProperties: Property zookeeper.connect is overridden to localhost:2181/kafkatest INFO 2013-07-03 13:47:30,569 [Thread-0] kafka.utils.VerifiableProperties: Property log.dir is overridden to \tmp\embeddedkafka\1372873650268 INFO 2013-07-03 13:47:30,574 [Thread-0] kafka.server.KafkaServer: [Kafka Server 1], Starting INFO 2013-07-03 13:47:30,609 [Thread-0] kafka.log.LogManager: [Log Manager on Broker 1] Log directory '/home/gman/workspace/distributed_parser/\tmp\embeddedkafka\1372873650268' not found, creating it. INFO 2013-07-03 13:47:30,619 [Thread-0] kafka.log.LogManager: [Log Manager on Broker 1] Starting log cleaner every 60 ms INFO 2013-07-03 13:47:30,630 [Thread-0] kafka.log.LogManager: [Log Manager on Broker 1] Starting log flusher every 3000 ms with the following overrides Map() INFO 2013-07-03 13:47:30,687 [Thread-0] kafka.network.Acceptor: Awaiting socket connections on 0.0.0.0:. INFO 2013-07-03 13:47:30,688 [Thread-0] kafka.network.SocketServer: [Socket Server on Broker 1], Started INFO 2013-07-03 13:47:30,696 [Thread-0]
[jira] [Commented] (KAFKA-961) state.change.logger: Error on broker 1 while processing LeaderAndIsr request correlationId 6 received from controller 1 epoch 1 for partition (page_visits,0)
[ https://issues.apache.org/jira/browse/KAFKA-961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13699741#comment-13699741 ] Garrett Barton commented on KAFKA-961: -- Thanks for the reply! I personally lean towards the cleaner constructor as I think that logic should be defaulted in the class instead of every time I use it having to pass one in. Having said that, I am probably doing odd things with the embedding of Kafka so I understand if you think differently. state.change.logger: Error on broker 1 while processing LeaderAndIsr request correlationId 6 received from controller 1 epoch 1 for partition (page_visits,0) - Key: KAFKA-961 URL: https://issues.apache.org/jira/browse/KAFKA-961 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Environment: Linux gman-minty 3.8.0-19-generic #29-Ubuntu SMP Wed Apr 17 18:16:28 UTC 2013 x86_64 x86_64 x86_64 GNU/Linux Reporter: Garrett Barton Been having issues embedding 0.8 servers into some Yarn stuff I'm doing. I just pulled the latest from git, did a ./sbt +package, followed by ./sbt assembly-package-dependency. And pushed core/target/scala-2.8.0/kafka_2.8.0-0.8.0-beta1.jar into my local mvn repo. Here is sample code ripped out to little classes to show my error: Starting up a broker embedded in java, with the following code: ... Properties props = new Properties(); // dont set so it binds to all interfaces // props.setProperty(hostname, hostName); props.setProperty(port, ); props.setProperty(broker.id, 1); props.setProperty(log.dir, /tmp/embeddedkafka/ + randId); // TODO: hardcode bad props.setProperty(zookeeper.connect, localhost:2181/ + randId); KafkaConfig kconf = new KafkaConfig(props); server = new KafkaServer(kconf, null); server.startup(); LOG.info(Broker online); Sample Producer has the following code: ... Properties props = new Properties(); props.put(metadata.broker.list, gman-minty:); props.put(serializer.class, kafka.serializer.StringEncoder); props.put(partitioner.class, com.gman.broker.SimplePartitioner); props.put(request.required.acks, 1); ProducerConfig config = new ProducerConfig(props); ProducerString, String producer = new ProducerString, String(config); LOG.info(producer created); KeyedMessageString, String data = new KeyedMessageString, String(page_visits, key1, value1); producer.send(data); LOG.info(wrote message: + data); And here is the server log: INFO 2013-07-03 13:47:30,538 [Thread-0] kafka.utils.VerifiableProperties: Verifying properties INFO 2013-07-03 13:47:30,568 [Thread-0] kafka.utils.VerifiableProperties: Property port is overridden to INFO 2013-07-03 13:47:30,568 [Thread-0] kafka.utils.VerifiableProperties: Property broker.id is overridden to 1 INFO 2013-07-03 13:47:30,568 [Thread-0] kafka.utils.VerifiableProperties: Property zookeeper.connect is overridden to localhost:2181/kafkatest INFO 2013-07-03 13:47:30,569 [Thread-0] kafka.utils.VerifiableProperties: Property log.dir is overridden to \tmp\embeddedkafka\1372873650268 INFO 2013-07-03 13:47:30,574 [Thread-0] kafka.server.KafkaServer: [Kafka Server 1], Starting INFO 2013-07-03 13:47:30,609 [Thread-0] kafka.log.LogManager: [Log Manager on Broker 1] Log directory '/home/gman/workspace/distributed_parser/\tmp\embeddedkafka\1372873650268' not found, creating it. INFO 2013-07-03 13:47:30,619 [Thread-0] kafka.log.LogManager: [Log Manager on Broker 1] Starting log cleaner every 60 ms INFO 2013-07-03 13:47:30,630 [Thread-0] kafka.log.LogManager: [Log Manager on Broker 1] Starting log flusher every 3000 ms with the following overrides Map() INFO 2013-07-03 13:47:30,687 [Thread-0] kafka.network.Acceptor: Awaiting socket connections on 0.0.0.0:. INFO 2013-07-03 13:47:30,688 [Thread-0] kafka.network.SocketServer: [Socket Server on Broker 1], Started INFO 2013-07-03 13:47:30,696 [Thread-0] kafka.server.KafkaZooKeeper: connecting to ZK: localhost:2181/kafkatest INFO 2013-07-03 13:47:30,707 [ZkClient-EventThread-17-localhost:2181/kafkatest] org.I0Itec.zkclient.ZkEventThread: Starting ZkClient event thread. INFO 2013-07-03 13:47:30,716 [Thread-0] org.apache.zookeeper.ZooKeeper: