This is how I set up my JUnit test to get kafka and zookeeper running during the duration of the test:
static { embeddedZKServer = new TestingServer(); embeddedKafkaServerPort = TestUtils.RandomPort(); Properties brokerProperties = TestUtils.createBrokerConfig(1, embeddedZKServer.getConnectString(), true, false, TestUtils.RandomPort(), scala.None$.MODULE$, scala.None$.MODULE$, true, false, TestUtils.RandomPort(), false, TestUtils.RandomPort(), false, TestUtils.RandomPort()); brokerProperties.setProperty("zookeeper.connect", embeddedZKServer.getConnectString()); embeddedKafkaServer = TestUtils.createServer(new KafkaConfig(brokerProperties), new MockTime()); KAFKA_PORT = embeddedKafkaServer.socketServer().boundPort(SecurityProtocol.PLAINTEXT) println "${"\n"*10}Kafka is running on port ${KAFKA_PORT}" } This is the unit test: @Test public void testPost() { String doc = System.getResourceAsStream("/document.xml").text def entity = new Entity<String>(doc, MediaType.APPLICATION_XML_TYPE) Properties props = new Properties(); props.put("bootstrap.servers", ["localhost:$KAFKA_PORT".toString()]); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); ["topic1", "topic2"].each { topic -> Thread.start { KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList(topic)); def done = false while (!done) { ConsumerRecords<String, String> records = consumer.poll(100); Thread.sleep(100) if (records.count() > 5) { done = true println "Done!" println records.iterator().next().value() } } } } 0.upto(25) { /* operation that posts to kafka */ } Thread.sleep(120000) } Oddly enough, things were running fine yesterday. Now I get the following errors which show that the threaded code never completes: org.apache.kafka.clients.NetworkClient: Error while fetching metadata with correlation id 1 : {topic1=LEADER_NOT_AVAILABLE} org.apache.kafka.clients.NetworkClient: Error while fetching metadata with correlation id 1 : {topic2=LEADER_NOT_AVAILABLE} org.apache.kafka.clients.NetworkClient: Error while fetching metadata with correlation id 3 : {topic2=LEADER_NOT_AVAILABLE} org.apache.kafka.clients.NetworkClient: Error while fetching metadata with correlation id 3 : {topic1=LEADER_NOT_AVAILABLE} org.apache.kafka.clients.NetworkClient: Error while fetching metadata with correlation id 5 : {topic2=LEADER_NOT_AVAILABLE} org.apache.kafka.clients.NetworkClient: Error while fetching metadata with correlation id 5 : {topic1=LEADER_NOT_AVAILABLE} Any ideas? Are there any samples that I can use to better work on this? Thanks, Mike