Hi, New to the group – ‘hello’! Just starting to look into Beam and I very much like the concepts, but have rather fallen at the first hurdle – that being trying to subscribe to a kafka topic and process results. Very simply the following code doesn’t get receive any records (the data is going into the queue) – I just get nothing. I have tried on both direct-runner and flink-runner (using the Quickstart as a base for options, mvn profile etc.)
Code Pipeline p = Pipeline.create(options); List<String> topics = ImmutableList.of("test-http-logs-json"); PCollection<String> logs = p.apply(KafkaIO.<String, String>read() .withBootstrapServers("datanode2-cm1.mis-cds.local:6667,datanode3-cm1.mis-cds.local:6667,datanode6-cm1.mis-cds.local:6667") .withTopics(topics) .withKeyCoder(StringUtf8Coder.of()) .withValueCoder(StringUtf8Coder.of()) .withMaxNumRecords(10) .updateConsumerProperties(ImmutableMap.<String, Object>builder() .put("auto.offset.reset", (Object) "earliest") .put("group.id", (Object) "http-logs-beam-json") .put("enable.auto.commit", (Object) "true") .put("receive.buffer.bytes", 1024 * 1024) .build()) // set a Coder for Key and Value .withoutMetadata()) .apply("Transform ", MapElements.via(new SimpleFunction<KV<String, String>, String>() { @Override public String apply(KV<String, String> input) { log.debug("{}", input.getValue()); return input.getKey() + " " + input.getValue(); } })); p.run(); Result: May 04, 2017 5:02:13 PM org.apache.kafka.common.config.AbstractConfig logAll INFO: ConsumerConfig values: metric.reporters = [] metadata.max.age.ms = 300000 value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer group.id = http-logs-beam-json partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 1048576 bootstrap.servers = [datanode2-cm1.mis-cds.local:6667, datanode3-cm1.mis-cds.local:6667, datanode6-cm1.mis-cds.local:6667] retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.keystore.type = JKS ssl.trustmanager.algorithm = PKIX enable.auto.commit = true ssl.key.password = null fetch.max.wait.ms = 500 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 ssl.truststore.password = null session.timeout.ms = 30000 metrics.num.samples = 2 client.id = ssl.endpoint.identification.algorithm = null key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer ssl.protocol = TLS check.crcs = true request.timeout.ms = 40000 ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null heartbeat.interval.ms = 3000 auto.commit.interval.ms = 5000 receive.buffer.bytes = 1048576 ssl.cipher.suites = null ssl.truststore.type = JKS security.protocol = PLAINTEXT ssl.truststore.location = null ssl.keystore.password = null ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 fetch.min.bytes = 1 send.buffer.bytes = 131072 auto.offset.reset = earliest May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init> INFO: Kafka version : 0.9.0.1 May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init> INFO: Kafka commitId : 23c69d62a0cabf06 May 04, 2017 5:02:13 PM org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource generateInitialSplits INFO: Partitions assigned to split 0 (total 1): test-http-logs-json-0 May 04, 2017 5:02:13 PM org.apache.kafka.common.config.AbstractConfig logAll INFO: ConsumerConfig values: metric.reporters = [] metadata.max.age.ms = 300000 value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer group.id = http-logs-beam-json partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 1048576 bootstrap.servers = [datanode2-cm1.mis-cds.local:6667, datanode3-cm1.mis-cds.local:6667, datanode6-cm1.mis-cds.local:6667] retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.keystore.type = JKS ssl.trustmanager.algorithm = PKIX enable.auto.commit = true ssl.key.password = null fetch.max.wait.ms = 500 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 ssl.truststore.password = null session.timeout.ms = 30000 metrics.num.samples = 2 client.id = ssl.endpoint.identification.algorithm = null key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer ssl.protocol = TLS check.crcs = true request.timeout.ms = 40000 ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null heartbeat.interval.ms = 3000 auto.commit.interval.ms = 5000 receive.buffer.bytes = 1048576 ssl.cipher.suites = null ssl.truststore.type = JKS security.protocol = PLAINTEXT ssl.truststore.location = null ssl.keystore.password = null ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 fetch.min.bytes = 1 send.buffer.bytes = 131072 auto.offset.reset = earliest May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init> INFO: Kafka version : 0.9.0.1 May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init> INFO: Kafka commitId : 23c69d62a0cabf06 May 04, 2017 5:02:14 PM org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader start INFO: Reader-0: reading from test-http-logs-json-0 starting at offset 0 May 04, 2017 5:02:14 PM org.apache.kafka.common.config.AbstractConfig logAll INFO: ConsumerConfig values: metric.reporters = [] metadata.max.age.ms = 300000 value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer group.id = Reader-0_offset_consumer_1029147362_http-logs-beam-json partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 1048576 bootstrap.servers = [datanode2-cm1.mis-cds.local:6667, datanode3-cm1.mis-cds.local:6667, datanode6-cm1.mis-cds.local:6667] retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.keystore.type = JKS ssl.trustmanager.algorithm = PKIX enable.auto.commit = false ssl.key.password = null fetch.max.wait.ms = 500 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 ssl.truststore.password = null session.timeout.ms = 30000 metrics.num.samples = 2 client.id = ssl.endpoint.identification.algorithm = null key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer ssl.protocol = TLS check.crcs = true request.timeout.ms = 40000 ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null heartbeat.interval.ms = 3000 auto.commit.interval.ms = 5000 receive.buffer.bytes = 1048576 ssl.cipher.suites = null ssl.truststore.type = JKS security.protocol = PLAINTEXT ssl.truststore.location = null ssl.keystore.password = null ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 fetch.min.bytes = 1 send.buffer.bytes = 131072 auto.offset.reset = earliest May 04, 2017 5:02:14 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init> INFO: Kafka version : 0.9.0.1 May 04, 2017 5:02:14 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init> INFO: Kafka commitId : 23c69d62a0cabf06 Any suggestions as I have been on this now for a over a day with various attempts but nothing comes through. When connecting to a different topic (which I subscribe directly via Flink in another application and get data from), I can set the .put("auto.offset.reset", (Object) "earliest") to earliest and latest and see different values for the offset so kafka appears to be available/ visible etc. Many thanks Conrad SecureData, combating cyber threats ______________________________________________________________________ The information contained in this message or any of its attachments may be privileged and confidential and intended for the exclusive use of the intended recipient. If you are not the intended recipient any disclosure, reproduction, distribution or other dissemination or use of this communications is strictly prohibited. The views expressed in this email are those of the individual and not necessarily of SecureData Europe Ltd. Any prices quoted are only valid if followed up by a formal written quote. SecureData Europe Limited. Registered in England & Wales 04365896. Registered Address: SecureData House, Hermitage Court, Hermitage Lane, Maidstone, Kent, ME16 9NT