Hi, Ok, good to know I’m not going totally mad. I think I may have been running around in circles unnecessarily <grrr> I am using kafka as part of an HDP installation (with Ambari). The Ambari interface is reporting my kafka version as 0.9.0.2.3 and indeed the output given previously give INFO: Kafka version : 0.9.0.1 (which doesn’t make particular sense). So I have ssh’d onto the server and looked at the libs for kafka and they are kafka_2.10-0.8.2.2.3.0.0-2557.jar so I’m guessing something is not quite right. This is incredibly frustrating as it looks like I am trying to connect to v0.9 kafka but it’s actually v0.8 which clearly is very different wrt/ zookeeper etc. This is also backup up by trying the kafka-console-consumer.sh (and all the other tools) ask for mandatory zookeeper options which shouldn’t be necessary as far as I understand it for v0.9.
I am currently looking at https://github.com/mxm/incubator-beam/blob/63bce07d8c6cc5e610ad24e915e2585fef582567/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java to see if I can use this code somehow to use Beam with Kafka v0.8. I am really hoping to as I have no option to upgrade currently and I really like the abstraction of Beam. Thanks Conrad From: Mingmin Xu <mingm...@gmail.com> Reply-To: "user@beam.apache.org" <user@beam.apache.org> Date: Thursday, 4 May 2017 at 17:59 To: "user@beam.apache.org" <user@beam.apache.org> Subject: Re: KafkaIO nothing received? @Conrad, Your code should be good to go, I can run it in my local env. There're two points you may have a check: 1). does the topic have data there, you can confirm with kafka cli 'bin/kafka-console-consumer.sh'; 2). is the port in bootstrapServers right? By default it's 9092. On Thu, May 4, 2017 at 9:05 AM, Conrad Crampton <conrad.cramp...@secdata.com<mailto:conrad.cramp...@secdata.com>> wrote: Hi, New to the group – ‘hello’! Just starting to look into Beam and I very much like the concepts, but have rather fallen at the first hurdle – that being trying to subscribe to a kafka topic and process results. Very simply the following code doesn’t get receive any records (the data is going into the queue) – I just get nothing. I have tried on both direct-runner and flink-runner (using the Quickstart as a base for options, mvn profile etc.) Code Pipeline p = Pipeline.create(options); List<String> topics = ImmutableList.of("test-http-logs-json"); PCollection<String> logs = p.apply(KafkaIO.<String, String>read() .withBootstrapServers("datanode2-cm1.mis-cds.local:6667,datanode3-cm1.mis-cds.local:6667,datanode6-cm1.mis-cds.local:6667") .withTopics(topics) .withKeyCoder(StringUtf8Coder.of()) .withValueCoder(StringUtf8Coder.of()) .withMaxNumRecords(10) .updateConsumerProperties(ImmutableMap.<String, Object>builder() .put("auto.offset.reset", (Object) "earliest") .put("group.id<http://group.id>", (Object) "http-logs-beam-json") .put("enable.auto.commit", (Object) "true") .put("receive.buffer.bytes", 1024 * 1024) .build()) // set a Coder for Key and Value .withoutMetadata()) .apply("Transform ", MapElements.via(new SimpleFunction<KV<String, String>, String>() { @Override public String apply(KV<String, String> input) { log.debug("{}", input.getValue()); return input.getKey() + " " + input.getValue(); } })); p.run(); Result: May 04, 2017 5:02:13 PM org.apache.kafka.common.config.AbstractConfig logAll INFO: ConsumerConfig values: metric.reporters = [] metadata.max.age.ms<http://metadata.max.age.ms> = 300000 value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer group.id<http://group.id> = http-logs-beam-json partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms<http://reconnect.backoff.ms> = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 1048576 bootstrap.servers = [datanode2-cm1.mis-cds.local:6667, datanode3-cm1.mis-cds.local:6667, datanode6-cm1.mis-cds.local:6667] retry.backoff.ms<http://retry.backoff.ms> = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name<http://sasl.kerberos.service.name> = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.keystore.type = JKS ssl.trustmanager.algorithm = PKIX enable.auto.commit = true ssl.key.password = null fetch.max.wait.ms<http://fetch.max.wait.ms> = 500 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms<http://connections.max.idle.ms> = 540000 ssl.truststore.password = null session.timeout.ms<http://session.timeout.ms> = 30000 metrics.num.samples = 2 client.id<http://client.id> = ssl.endpoint.identification.algorithm = null key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer ssl.protocol = TLS check.crcs = true request.timeout.ms<http://request.timeout.ms> = 40000 ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null heartbeat.interval.ms<http://heartbeat.interval.ms> = 3000 auto.commit.interval.ms<http://auto.commit.interval.ms> = 5000 receive.buffer.bytes = 1048576 ssl.cipher.suites = null ssl.truststore.type = JKS security.protocol = PLAINTEXT ssl.truststore.location = null ssl.keystore.password = null ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms<http://metrics.sample.window.ms> = 30000 fetch.min.bytes = 1 send.buffer.bytes = 131072 auto.offset.reset = earliest May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init> INFO: Kafka version : 0.9.0.1 May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init> INFO: Kafka commitId : 23c69d62a0cabf06 May 04, 2017 5:02:13 PM org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource generateInitialSplits INFO: Partitions assigned to split 0 (total 1): test-http-logs-json-0 May 04, 2017 5:02:13 PM org.apache.kafka.common.config.AbstractConfig logAll INFO: ConsumerConfig values: metric.reporters = [] metadata.max.age.ms<http://metadata.max.age.ms> = 300000 value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer group.id<http://group.id> = http-logs-beam-json partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms<http://reconnect.backoff.ms> = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 1048576 bootstrap.servers = [datanode2-cm1.mis-cds.local:6667, datanode3-cm1.mis-cds.local:6667, datanode6-cm1.mis-cds.local:6667] retry.backoff.ms<http://retry.backoff.ms> = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name<http://sasl.kerberos.service.name> = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.keystore.type = JKS ssl.trustmanager.algorithm = PKIX enable.auto.commit = true ssl.key.password = null fetch.max.wait.ms<http://fetch.max.wait.ms> = 500 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms<http://connections.max.idle.ms> = 540000 ssl.truststore.password = null session.timeout.ms<http://session.timeout.ms> = 30000 metrics.num.samples = 2 client.id<http://client.id> = ssl.endpoint.identification.algorithm = null key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer ssl.protocol = TLS check.crcs = true request.timeout.ms<http://request.timeout.ms> = 40000 ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null heartbeat.interval.ms<http://heartbeat.interval.ms> = 3000 auto.commit.interval.ms<http://auto.commit.interval.ms> = 5000 receive.buffer.bytes = 1048576 ssl.cipher.suites = null ssl.truststore.type = JKS security.protocol = PLAINTEXT ssl.truststore.location = null ssl.keystore.password = null ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms<http://metrics.sample.window.ms> = 30000 fetch.min.bytes = 1 send.buffer.bytes = 131072 auto.offset.reset = earliest May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init> INFO: Kafka version : 0.9.0.1 May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init> INFO: Kafka commitId : 23c69d62a0cabf06 May 04, 2017 5:02:14 PM org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader start INFO: Reader-0: reading from test-http-logs-json-0 starting at offset 0 May 04, 2017 5:02:14 PM org.apache.kafka.common.config.AbstractConfig logAll INFO: ConsumerConfig values: metric.reporters = [] metadata.max.age.ms<http://metadata.max.age.ms> = 300000 value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer group.id<http://group.id> = Reader-0_offset_consumer_1029147362_http-logs-beam-json partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms<http://reconnect.backoff.ms> = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 1048576 bootstrap.servers = [datanode2-cm1.mis-cds.local:6667, datanode3-cm1.mis-cds.local:6667, datanode6-cm1.mis-cds.local:6667] retry.backoff.ms<http://retry.backoff.ms> = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name<http://sasl.kerberos.service.name> = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.keystore.type = JKS ssl.trustmanager.algorithm = PKIX enable.auto.commit = false ssl.key.password = null fetch.max.wait.ms<http://fetch.max.wait.ms> = 500 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms<http://connections.max.idle.ms> = 540000 ssl.truststore.password = null session.timeout.ms<http://session.timeout.ms> = 30000 metrics.num.samples = 2 client.id<http://client.id> = ssl.endpoint.identification.algorithm = null key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer ssl.protocol = TLS check.crcs = true request.timeout.ms<http://request.timeout.ms> = 40000 ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null heartbeat.interval.ms<http://heartbeat.interval.ms> = 3000 auto.commit.interval.ms<http://auto.commit.interval.ms> = 5000 receive.buffer.bytes = 1048576 ssl.cipher.suites = null ssl.truststore.type = JKS security.protocol = PLAINTEXT ssl.truststore.location = null ssl.keystore.password = null ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms<http://metrics.sample.window.ms> = 30000 fetch.min.bytes = 1 send.buffer.bytes = 131072 auto.offset.reset = earliest May 04, 2017 5:02:14 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init> INFO: Kafka version : 0.9.0.1 May 04, 2017 5:02:14 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init> INFO: Kafka commitId : 23c69d62a0cabf06 Any suggestions as I have been on this now for a over a day with various attempts but nothing comes through. When connecting to a different topic (which I subscribe directly via Flink in another application and get data from), I can set the .put("auto.offset.reset", (Object) "earliest") to earliest and latest and see different values for the offset so kafka appears to be available/ visible etc. Many thanks Conrad SecureData, combating cyber threats ________________________________ The information contained in this message or any of its attachments may be privileged and confidential and intended for the exclusive use of the intended recipient. If you are not the intended recipient any disclosure, reproduction, distribution or other dissemination or use of this communications is strictly prohibited. The views expressed in this email are those of the individual and not necessarily of SecureData Europe Ltd. Any prices quoted are only valid if followed up by a formal written quote. SecureData Europe Limited. Registered in England & Wales 04365896. Registered Address: SecureData House, Hermitage Court, Hermitage Lane, Maidstone, Kent, ME16 9NT -- ---- Mingmin ***This email originated outside SecureData*** Click here<https://www.mailcontrol.com/sr/XC8IyZoRRIvGX2PQPOmvUqa7UuQeNDoMrRTXgSjgaLYaIh5CEL7EFXOiQfY7SUO4JKSem2rP9Blzr!jqbv!p6A==> to report this email as spam.