Calling the stand alone client (org.apache.kafka" % "kafka-clients" % “0.9.0.0”) from Scala, consumer.poll never returns. I’ve tried both assign TopicPartition and subscribe and various timeouts and I’ve quintuple checked config properties. Here’s a Scala-Ide worksheet
val props = loadProperties(new StringBuilder("kafkaConsumer.properties")) //> props : java.util.Properties = {key.deserializer=org.apache.kafka.common.se //| rialization.StringDeserializer, auto.commit.interval.ms=1000, bootstrap.serv //| ers=127.0.0.1:2181, enable.auto.commit=true, group.id=test, value.deserializ //| er=org.apache.kafka.common.serialization.LongDeserializer, session.timeout.m //| s=30000} val topic = "debug-topic" //> topic : String = debug-topic val topicList = List(topic).asJava //> topicList : java.util.List[String] = [debug-topic] val consumer = new KafkaConsumer[String, String](props) //> 15:07:22,501 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could //| NOT find resource [logback.groovy] //| 15:07:22,501 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could //| NOT find resource [logback-test.xml] //| 15:07:22,501 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Found //| resource [logback.xml] at [file:/Users/garystruthers/git/dendrites/target/s //| cala-2.11/classes/logback.xml] //| 15:07:22,502 |-WARN in ch.qos.logback.classic.LoggerContext[default] - Resou //| rce [logback.xml] occurs multiple times on the classpath. //| 15:07:22,502 |-WARN in ch.qos.logback.classic.LoggerContext[default] - Resou //| rce [logback.xml] occurs at [file:/Users/garystruthers/git/dendrites/bin/log //| back.xml] //| 15:07:22,502 |-WARN in ch.qos.logback.classic.LoggerContext[default] - Resou //| rce [logback.xml] occurs at [file:/Users/garystruthers/git/dendrites/target/ //| scala-2.11/classes/logback.xml] //| 15:07:22,592 |-INFO in ch.qos.logback. //| Output exceeds cutoff limit. val tp0 = new TopicPartition(topic, 0) //> tp0 : org.apache.kafka.common.TopicPartition = debug-topic-0 val topicPartitions = List(tp0).asJava //> topicPartitions : java.util.List[org.apache.kafka.common.TopicPartition] = //| [debug-topic-0]\ consumer.assign(topicPartitions) //consumer.subscribe(topicList) val records = consumer.poll(1000) Gary