If you are using it for tests, this works with Kafka 10 (tune broker
configs as per your req)

public class TestKafkaCluster {
    final KafkaServer kafkaServer;
    final ZkClient zkClient;
    private String zkConnectionString;

    public TestKafkaCluster(String zkConnectionString, int kafkaPort)
throws Exception {
        this.zkConnectionString = zkConnectionString;
        zkClient = new ZkClient(zkConnectionString,
                30000, 30000, ZKStringSerializer$.MODULE$);
        //zkClient.createPersistent("/flo/kafka", true);
        final KafkaConfig config = getKafkaConfig(zkConnectionString,
kafkaPort);
        config.port();
        final Time mock = new MockTime();
        kafkaServer = TestUtils.createServer(config, mock);
    }

    private static KafkaConfig getKafkaConfig(final String
zkConnectString, int port) {
        final Properties brokerConfig = TestUtils.createBrokerConfig(1,
                zkConnectString,
                false,
                false,
                port,
                Option.apply(SecurityProtocol.PLAINTEXT),
                Option.empty(),
                Option.empty(),
                false,
                false,
                0,
                false,
                0,
                false,
                0,
                Option.empty());
        brokerConfig.put("default.replication.factor", String.valueOf(1));
        return new KafkaConfig(brokerConfig);
    }

    public KafkaServer getKafkaServer() {
        return kafkaServer;
    }

    public void stop() throws IOException {
        kafkaServer.shutdown();
        zkClient.close();
    }

    public String getZkConnectionString() {
        return zkConnectionString;
    }
}


On Fri, Jun 17, 2016 at 2:18 AM, Ismael Juma <ism...@juma.me.uk> wrote:

> Try using kafka.server.KafkaServerStartable instead. It should do the right
> thing.
>
> Ismael
>
> On Thu, Jun 16, 2016 at 7:18 PM, Subhash Agrawal <agraw...@opentext.com>
> wrote:
>
> > Thanks Ismael.
> > I am instantiating kafkaserver instance like this.
> > new KafkaServer(kafkaConfig,null,null);
> >
> > I tried to use
> > new KafkaServer(kafkaConfig); but it does not compile with kafka 0.10.0.
> >
> > All the example I see uses
> > new KafkaServer(kafkaConfig);
> >
> > Do we support new KafkaServer(kafkaConfig);   with kafka 0.10.0? if not,
> > how can I pass
> > these parameters? It used to work with kafka 0.7.1.
> >
> > Thanks
> > Subhash Agrawal
> >
> >
> > -----Original Message-----
> > From: isma...@gmail.com [mailto:isma...@gmail.com] On Behalf Of Ismael
> > Juma
> > Sent: Thursday, June 16, 2016 1:38 AM
> > To: dev@kafka.apache.org
> > Subject: Re: Embedding zookeeper and kafka in java process.
> >
> > Hi Subhash,
> >
> > This would happen if `null` is passed as the `threadNamePrefix` argument
> > when instantiating `KafkaServer`:
> >
> > class KafkaServer(val config: KafkaConfig, time: Time = SystemTime,
> > threadNamePrefix: Option[String] = None) extends Logging with
> > KafkaMetricsGroup
> >
> > How are you starting Kafka in your Java process?
> >
> > Ismael
> >
> > On Thu, Jun 16, 2016 at 1:26 AM, Subhash Agrawal <agraw...@opentext.com>
> > wrote:
> >
> > > Thanks for quick response.
> > > I started zookeeper via zookeeper-server-start.bat and started kafka
> via
> > > my java process and I saw same error.
> > > But if I start zookeeper via java process and start kafka via
> > > kafka-server-start.bat, t works fine.
> > > It means it is not caused due to both getting started in same process.
> It
> > > must be some kafka specific issue.
> > >
> > > Subhash Agrawal
> > >
> > > -----Original Message-----
> > > From: Guozhang Wang [mailto:wangg...@gmail.com]
> > > Sent: Wednesday, June 15, 2016 3:42 PM
> > > To: dev@kafka.apache.org
> > > Subject: Re: Embedding zookeeper and kafka in java process.
> > >
> > > It seems "scala.MatchError: null" are not related to the settings that
> ZK
> > > and Kafka is embedded in the same process, and the only case that I can
> > > think of related is this:
> > https://issues.apache.org/jira/browse/KAFKA-940.
> > >
> > > Could you clarify if you start these two services on two processes, the
> > > issue goes away?
> > >
> > > Guozhang
> > >
> > > On Wed, Jun 15, 2016 at 1:56 PM, Subhash Agrawal <
> agraw...@opentext.com>
> > > wrote:
> > >
> > > > Hi All,
> > > > I am embedding Kafka 0.10.0 and corresponding zookeeper in java
> > process.
> > > > In this process, I start zookeeper first and then wait for 10 seconds
> > and
> > > > then start kafka. These are all running in the same process. Toward
> the
> > > > end of kafka startup, I see following exception. It seems zookeeper
> is
> > > not
> > > > able
> > > > to add the newly created kafka instance. Have you seen this error
> > > > earlier?  I have only single node kafka.
> > > >
> > > > Let me know if you have any suggestions. I will really appreciate any
> > > help
> > > > on this.
> > > >
> > > > Thanks
> > > > Subhash Agrawal.
> > > >
> > > > [2016-06-15 13:39:39,616] INFO [Logserver_Starter] Registered broker
> 0
> > at
> > > > path /brokers/ids/0 with addresses: PLAINTEXT ->
> > > > EndPoint(localhost,8392,PLAINTEXT) (kafka.utils.ZkUtils)
> > > > [2016-06-15 13:39:39,617] WARN [Logserver_Starter] No meta.properties
> > > file
> > > > under dir C:\development \newkafka-logs\meta.properties
> > > > (kafka.server.BrokerMetadataCheckpoint)
> > > > [2016-06-15 13:39:39,627] INFO
> [ZkClient-EventThread-24-localhost:2181]
> > > > New leader is 0
> > > (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
> > > > [2016-06-15 13:39:39,629] INFO
> [ZkClient-EventThread-24-localhost:2181]
> > > > [BrokerChangeListener on Controller 0]: Broker change listener fired
> > for
> > > > path /brokers/ids with children 0
> > > > (kafka.controller.ReplicaStateMachine$BrokerChangeListener)
> > > > [2016-06-15 13:39:39,638] INFO [Logserver_Starter] Kafka version :
> > > > 0.10.0.0 (org.apache.kafka.common.utils.AppInfoParser)
> > > > [2016-06-15 13:39:39,638] INFO [Logserver_Starter] Kafka commitId :
> > > > b8642491e78c5a13 (org.apache.kafka.common.utils.AppInfoParser)
> > > > [2016-06-15 13:39:39,639] INFO [Logserver_Starter] [Kafka Server 0],
> > > > started (kafka.server.KafkaServer)
> > > > [2016-06-15 13:39:39,806] INFO
> [ZkClient-EventThread-24-localhost:2181]
> > > > [BrokerChangeListener on Controller 0]: Newly added brokers: 0,
> deleted
> > > > brokers: , all live brokers: 0
> > > > (kafka.controller.ReplicaStateMachine$BrokerChangeListener)
> > > > [2016-06-15 13:39:39,808] DEBUG
> > [ZkClient-EventThread-24-localhost:2181]
> > > > [Channel manager on controller 0]: Controller 0 trying to connect to
> > > broker
> > > > 0 (kafka.controller.ControllerChannelManager)
> > > > [2016-06-15 13:39:39,818] ERROR
> > [ZkClient-EventThread-24-localhost:2181]
> > > > [BrokerChangeListener on Controller 0]: Error while handling broker
> > > changes
> > > > (kafka.controller.ReplicaStateMachine$BrokerChangeListener)
> > > > scala.MatchError: null
> > > >                 at
> > > >
> > >
> >
> kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$addNewBroker(ControllerChannelManager.scala:122)
> > > >                 at
> > > >
> > >
> >
> kafka.controller.ControllerChannelManager.addBroker(ControllerChannelManager.scala:74)
> > > >                 at
> > > >
> > >
> >
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$4.apply(ReplicaStateMachine.scala:372)
> > > >                 at
> > > >
> > >
> >
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$4.apply(ReplicaStateMachine.scala:372)
> > > >                 at
> > > > scala.collection.immutable.Set$Set1.foreach(Set.scala:94)
> > > >                 at
> > > >
> > >
> >
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:372)
> > > >                 at
> > > >
> > >
> >
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
> > > >                 at
> > > >
> > >
> >
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
> > > >                 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > >                 at
> > > >
> > >
> >
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358)
> > > >                 at
> > > >
> > >
> >
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
> > > >                 at
> > > >
> > >
> >
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
> > > >                 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:231)
> > > >                 at
> > > >
> > >
> >
> kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356)
> > > >                 at
> > org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:843)
> > > >                 at
> > > > org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>



-- 
Regards
Vamsi Subhash

Reply via email to