Hi,
New to the group – ‘hello’!

Just starting to look into Beam and I very much like the concepts, but have 
rather fallen at the first hurdle – that being trying to subscribe to a kafka 
topic and process results.
Very simply the following code doesn’t get receive any records (the data is 
going into the queue) – I just get nothing.
I have tried on both direct-runner and flink-runner (using the Quickstart as a 
base for options, mvn profile etc.)

Code

Pipeline p = Pipeline.create(options);

List<String> topics = ImmutableList.of("test-http-logs-json");


PCollection<String> logs = p.apply(KafkaIO.<String, String>read()
        
.withBootstrapServers("datanode2-cm1.mis-cds.local:6667,datanode3-cm1.mis-cds.local:6667,datanode6-cm1.mis-cds.local:6667")
        .withTopics(topics)
        .withKeyCoder(StringUtf8Coder.of())
        .withValueCoder(StringUtf8Coder.of())
        .withMaxNumRecords(10)
        .updateConsumerProperties(ImmutableMap.<String, Object>builder()
                .put("auto.offset.reset", (Object) "earliest")
                .put("group.id", (Object) "http-logs-beam-json")
                .put("enable.auto.commit", (Object) "true")
                .put("receive.buffer.bytes", 1024 * 1024)
                .build())
        // set a Coder for Key and Value
        .withoutMetadata())
        .apply("Transform ", MapElements.via(new SimpleFunction<KV<String, 
String>, String>() {
            @Override
            public String apply(KV<String, String> input) {
                log.debug("{}", input.getValue());
                return input.getKey() + " " + input.getValue();
            }
        }));


p.run();


Result:
May 04, 2017 5:02:13 PM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ConsumerConfig values:
        metric.reporters = []
        metadata.max.age.ms = 300000
        value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
        group.id = http-logs-beam-json
        partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]
        reconnect.backoff.ms = 50
        sasl.kerberos.ticket.renew.window.factor = 0.8
        max.partition.fetch.bytes = 1048576
        bootstrap.servers = [datanode2-cm1.mis-cds.local:6667, 
datanode3-cm1.mis-cds.local:6667, datanode6-cm1.mis-cds.local:6667]
        retry.backoff.ms = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.keystore.type = JKS
        ssl.trustmanager.algorithm = PKIX
        enable.auto.commit = true
        ssl.key.password = null
        fetch.max.wait.ms = 500
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms = 540000
        ssl.truststore.password = null
        session.timeout.ms = 30000
        metrics.num.samples = 2
        client.id =
        ssl.endpoint.identification.algorithm = null
        key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
        ssl.protocol = TLS
        check.crcs = true
        request.timeout.ms = 40000
       ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.keystore.location = null
        heartbeat.interval.ms = 3000
        auto.commit.interval.ms = 5000
        receive.buffer.bytes = 1048576
        ssl.cipher.suites = null
        ssl.truststore.type = JKS
        security.protocol = PLAINTEXT
        ssl.truststore.location = null
        ssl.keystore.password = null
        ssl.keymanager.algorithm = SunX509
        metrics.sample.window.ms = 30000
        fetch.min.bytes = 1
        send.buffer.bytes = 131072
        auto.offset.reset = earliest

May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo 
<init>
INFO: Kafka version : 0.9.0.1
May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo 
<init>
INFO: Kafka commitId : 23c69d62a0cabf06
May 04, 2017 5:02:13 PM 
org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource generateInitialSplits
INFO: Partitions assigned to split 0 (total 1): test-http-logs-json-0
May 04, 2017 5:02:13 PM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ConsumerConfig values:
        metric.reporters = []
        metadata.max.age.ms = 300000
        value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
        group.id = http-logs-beam-json
        partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]
        reconnect.backoff.ms = 50
        sasl.kerberos.ticket.renew.window.factor = 0.8
        max.partition.fetch.bytes = 1048576
        bootstrap.servers = [datanode2-cm1.mis-cds.local:6667, 
datanode3-cm1.mis-cds.local:6667, datanode6-cm1.mis-cds.local:6667]
        retry.backoff.ms = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.keystore.type = JKS
        ssl.trustmanager.algorithm = PKIX
        enable.auto.commit = true
        ssl.key.password = null
        fetch.max.wait.ms = 500
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms = 540000
        ssl.truststore.password = null
        session.timeout.ms = 30000
        metrics.num.samples = 2
        client.id =
        ssl.endpoint.identification.algorithm = null
        key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
        ssl.protocol = TLS
        check.crcs = true
        request.timeout.ms = 40000
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.keystore.location = null
        heartbeat.interval.ms = 3000
        auto.commit.interval.ms = 5000
        receive.buffer.bytes = 1048576
        ssl.cipher.suites = null
        ssl.truststore.type = JKS
        security.protocol = PLAINTEXT
        ssl.truststore.location = null
        ssl.keystore.password = null
        ssl.keymanager.algorithm = SunX509
        metrics.sample.window.ms = 30000
        fetch.min.bytes = 1
        send.buffer.bytes = 131072
        auto.offset.reset = earliest

May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo 
<init>
INFO: Kafka version : 0.9.0.1
May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo 
<init>
INFO: Kafka commitId : 23c69d62a0cabf06
May 04, 2017 5:02:14 PM 
org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader start
INFO: Reader-0: reading from test-http-logs-json-0 starting at offset 0
May 04, 2017 5:02:14 PM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ConsumerConfig values:
        metric.reporters = []
        metadata.max.age.ms = 300000
        value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
        group.id = Reader-0_offset_consumer_1029147362_http-logs-beam-json
        partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]
        reconnect.backoff.ms = 50
        sasl.kerberos.ticket.renew.window.factor = 0.8
        max.partition.fetch.bytes = 1048576
        bootstrap.servers = [datanode2-cm1.mis-cds.local:6667, 
datanode3-cm1.mis-cds.local:6667, datanode6-cm1.mis-cds.local:6667]
        retry.backoff.ms = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.keystore.type = JKS
        ssl.trustmanager.algorithm = PKIX
        enable.auto.commit = false
        ssl.key.password = null
        fetch.max.wait.ms = 500
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms = 540000
        ssl.truststore.password = null
        session.timeout.ms = 30000
        metrics.num.samples = 2
        client.id =
        ssl.endpoint.identification.algorithm = null
        key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
        ssl.protocol = TLS
        check.crcs = true
        request.timeout.ms = 40000
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.keystore.location = null
        heartbeat.interval.ms = 3000
        auto.commit.interval.ms = 5000
        receive.buffer.bytes = 1048576
        ssl.cipher.suites = null
        ssl.truststore.type = JKS
        security.protocol = PLAINTEXT
        ssl.truststore.location = null
        ssl.keystore.password = null
        ssl.keymanager.algorithm = SunX509
        metrics.sample.window.ms = 30000
        fetch.min.bytes = 1
        send.buffer.bytes = 131072
        auto.offset.reset = earliest

May 04, 2017 5:02:14 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo 
<init>
INFO: Kafka version : 0.9.0.1
May 04, 2017 5:02:14 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo 
<init>
INFO: Kafka commitId : 23c69d62a0cabf06


Any suggestions as I have been on this now for a over a day with various 
attempts but nothing comes through.
When connecting to a different topic (which I subscribe directly via Flink in 
another application and get data from), I can set the .put("auto.offset.reset", 
(Object) "earliest") to earliest and latest and see different values for the 
offset so kafka appears to be available/ visible etc.

Many thanks
Conrad


SecureData, combating cyber threats
______________________________________________________________________ 
The information contained in this message or any of its attachments may be 
privileged and confidential and intended for the exclusive use of the intended 
recipient. If you are not the intended recipient any disclosure, reproduction, 
distribution or other dissemination or use of this communications is strictly 
prohibited. The views expressed in this email are those of the individual and 
not necessarily of SecureData Europe Ltd. Any prices quoted are only valid if 
followed up by a formal written quote.

SecureData Europe Limited. Registered in England & Wales 04365896. Registered 
Address: SecureData House, Hermitage Court, Hermitage Lane, Maidstone, Kent, 
ME16 9NT

Reply via email to