If you are using it for tests, this works with Kafka 10 (tune broker configs as per your req)
public class TestKafkaCluster { final KafkaServer kafkaServer; final ZkClient zkClient; private String zkConnectionString; public TestKafkaCluster(String zkConnectionString, int kafkaPort) throws Exception { this.zkConnectionString = zkConnectionString; zkClient = new ZkClient(zkConnectionString, 30000, 30000, ZKStringSerializer$.MODULE$); //zkClient.createPersistent("/flo/kafka", true); final KafkaConfig config = getKafkaConfig(zkConnectionString, kafkaPort); config.port(); final Time mock = new MockTime(); kafkaServer = TestUtils.createServer(config, mock); } private static KafkaConfig getKafkaConfig(final String zkConnectString, int port) { final Properties brokerConfig = TestUtils.createBrokerConfig(1, zkConnectString, false, false, port, Option.apply(SecurityProtocol.PLAINTEXT), Option.empty(), Option.empty(), false, false, 0, false, 0, false, 0, Option.empty()); brokerConfig.put("default.replication.factor", String.valueOf(1)); return new KafkaConfig(brokerConfig); } public KafkaServer getKafkaServer() { return kafkaServer; } public void stop() throws IOException { kafkaServer.shutdown(); zkClient.close(); } public String getZkConnectionString() { return zkConnectionString; } } On Fri, Jun 17, 2016 at 2:18 AM, Ismael Juma <ism...@juma.me.uk> wrote: > Try using kafka.server.KafkaServerStartable instead. It should do the right > thing. > > Ismael > > On Thu, Jun 16, 2016 at 7:18 PM, Subhash Agrawal <agraw...@opentext.com> > wrote: > > > Thanks Ismael. > > I am instantiating kafkaserver instance like this. > > new KafkaServer(kafkaConfig,null,null); > > > > I tried to use > > new KafkaServer(kafkaConfig); but it does not compile with kafka 0.10.0. > > > > All the example I see uses > > new KafkaServer(kafkaConfig); > > > > Do we support new KafkaServer(kafkaConfig); with kafka 0.10.0? if not, > > how can I pass > > these parameters? It used to work with kafka 0.7.1. > > > > Thanks > > Subhash Agrawal > > > > > > -----Original Message----- > > From: isma...@gmail.com [mailto:isma...@gmail.com] On Behalf Of Ismael > > Juma > > Sent: Thursday, June 16, 2016 1:38 AM > > To: dev@kafka.apache.org > > Subject: Re: Embedding zookeeper and kafka in java process. > > > > Hi Subhash, > > > > This would happen if `null` is passed as the `threadNamePrefix` argument > > when instantiating `KafkaServer`: > > > > class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, > > threadNamePrefix: Option[String] = None) extends Logging with > > KafkaMetricsGroup > > > > How are you starting Kafka in your Java process? > > > > Ismael > > > > On Thu, Jun 16, 2016 at 1:26 AM, Subhash Agrawal <agraw...@opentext.com> > > wrote: > > > > > Thanks for quick response. > > > I started zookeeper via zookeeper-server-start.bat and started kafka > via > > > my java process and I saw same error. > > > But if I start zookeeper via java process and start kafka via > > > kafka-server-start.bat, t works fine. > > > It means it is not caused due to both getting started in same process. > It > > > must be some kafka specific issue. > > > > > > Subhash Agrawal > > > > > > -----Original Message----- > > > From: Guozhang Wang [mailto:wangg...@gmail.com] > > > Sent: Wednesday, June 15, 2016 3:42 PM > > > To: dev@kafka.apache.org > > > Subject: Re: Embedding zookeeper and kafka in java process. > > > > > > It seems "scala.MatchError: null" are not related to the settings that > ZK > > > and Kafka is embedded in the same process, and the only case that I can > > > think of related is this: > > https://issues.apache.org/jira/browse/KAFKA-940. > > > > > > Could you clarify if you start these two services on two processes, the > > > issue goes away? > > > > > > Guozhang > > > > > > On Wed, Jun 15, 2016 at 1:56 PM, Subhash Agrawal < > agraw...@opentext.com> > > > wrote: > > > > > > > Hi All, > > > > I am embedding Kafka 0.10.0 and corresponding zookeeper in java > > process. > > > > In this process, I start zookeeper first and then wait for 10 seconds > > and > > > > then start kafka. These are all running in the same process. Toward > the > > > > end of kafka startup, I see following exception. It seems zookeeper > is > > > not > > > > able > > > > to add the newly created kafka instance. Have you seen this error > > > > earlier? I have only single node kafka. > > > > > > > > Let me know if you have any suggestions. I will really appreciate any > > > help > > > > on this. > > > > > > > > Thanks > > > > Subhash Agrawal. > > > > > > > > [2016-06-15 13:39:39,616] INFO [Logserver_Starter] Registered broker > 0 > > at > > > > path /brokers/ids/0 with addresses: PLAINTEXT -> > > > > EndPoint(localhost,8392,PLAINTEXT) (kafka.utils.ZkUtils) > > > > [2016-06-15 13:39:39,617] WARN [Logserver_Starter] No meta.properties > > > file > > > > under dir C:\development \newkafka-logs\meta.properties > > > > (kafka.server.BrokerMetadataCheckpoint) > > > > [2016-06-15 13:39:39,627] INFO > [ZkClient-EventThread-24-localhost:2181] > > > > New leader is 0 > > > (kafka.server.ZookeeperLeaderElector$LeaderChangeListener) > > > > [2016-06-15 13:39:39,629] INFO > [ZkClient-EventThread-24-localhost:2181] > > > > [BrokerChangeListener on Controller 0]: Broker change listener fired > > for > > > > path /brokers/ids with children 0 > > > > (kafka.controller.ReplicaStateMachine$BrokerChangeListener) > > > > [2016-06-15 13:39:39,638] INFO [Logserver_Starter] Kafka version : > > > > 0.10.0.0 (org.apache.kafka.common.utils.AppInfoParser) > > > > [2016-06-15 13:39:39,638] INFO [Logserver_Starter] Kafka commitId : > > > > b8642491e78c5a13 (org.apache.kafka.common.utils.AppInfoParser) > > > > [2016-06-15 13:39:39,639] INFO [Logserver_Starter] [Kafka Server 0], > > > > started (kafka.server.KafkaServer) > > > > [2016-06-15 13:39:39,806] INFO > [ZkClient-EventThread-24-localhost:2181] > > > > [BrokerChangeListener on Controller 0]: Newly added brokers: 0, > deleted > > > > brokers: , all live brokers: 0 > > > > (kafka.controller.ReplicaStateMachine$BrokerChangeListener) > > > > [2016-06-15 13:39:39,808] DEBUG > > [ZkClient-EventThread-24-localhost:2181] > > > > [Channel manager on controller 0]: Controller 0 trying to connect to > > > broker > > > > 0 (kafka.controller.ControllerChannelManager) > > > > [2016-06-15 13:39:39,818] ERROR > > [ZkClient-EventThread-24-localhost:2181] > > > > [BrokerChangeListener on Controller 0]: Error while handling broker > > > changes > > > > (kafka.controller.ReplicaStateMachine$BrokerChangeListener) > > > > scala.MatchError: null > > > > at > > > > > > > > > > kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$addNewBroker(ControllerChannelManager.scala:122) > > > > at > > > > > > > > > > kafka.controller.ControllerChannelManager.addBroker(ControllerChannelManager.scala:74) > > > > at > > > > > > > > > > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$4.apply(ReplicaStateMachine.scala:372) > > > > at > > > > > > > > > > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$4.apply(ReplicaStateMachine.scala:372) > > > > at > > > > scala.collection.immutable.Set$Set1.foreach(Set.scala:94) > > > > at > > > > > > > > > > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:372) > > > > at > > > > > > > > > > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) > > > > at > > > > > > > > > > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) > > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > > > > at > > > > > > > > > > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358) > > > > at > > > > > > > > > > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) > > > > at > > > > > > > > > > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) > > > > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:231) > > > > at > > > > > > > > > > kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356) > > > > at > > org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:843) > > > > at > > > > org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > -- Regards Vamsi Subhash