Hi Raghu,
Yeah, the job just waits and does nothing. It reports the correct offset (this 
changes when I use ‘earliest’ or ‘latest’), but nothing is received. There are 
definitely messages in the queue. I am using Beam 0.6.
With my other application using Flink, I am using the FlinkKafkaConsumer08 
libraries (and not the FlinkKafkaConsumer09)  as I am sure I had a similar 
problem then i.e. no errors reported and appears to work fine, but nothing 
actually received in the streaming job.
Kafka 0.8 requires a zookeeper connect property to be set, but I can’t set this 
using updateConsumerProperties as the value gets discarded.
Thanks
Conrad

From: Raghu Angadi <rang...@google.com>
Reply-To: "user@beam.apache.org" <user@beam.apache.org>
Date: Thursday, 4 May 2017 at 18:27
To: "user@beam.apache.org" <user@beam.apache.org>
Subject: Re: KafkaIO nothing received?

Conrad,

It does not look like there is a version incompatibility. You would see errors 
during during initialization otherwise. Log line "INFO: Reader-0: reading from 
test-http-logs-json-0 starting at offset 0" says it Kafka consumer was able to 
connect to servers.

Does the the job just wait inside p.run()? What is the version of Beam you are 
using? If it is just waiting for records, please ensure the topic has messages 
(using kafka-console-consumer.sh etc). Alternately you can try reading from 
another topic you mentioned that worked fine.

Raghu.

On Thu, May 4, 2017 at 10:07 AM, Conrad Crampton 
<conrad.cramp...@secdata.com<mailto:conrad.cramp...@secdata.com>> wrote:
Hi,
Ok, good to know I’m not going totally mad.
I think I may have been running around in circles unnecessarily <grrr>
I am using kafka as part of an HDP installation (with Ambari). The Ambari 
interface is reporting my kafka version as 0.9.0.2.3 and indeed the output 
given previously give
INFO: Kafka version : 0.9.0.1 (which doesn’t make particular sense). So I have 
ssh’d onto the server and looked at the libs for kafka and they are 
kafka_2.10-0.8.2.2.3.0.0-2557.jar so I’m guessing something is not quite right. 
This is incredibly frustrating as it looks like I am trying to connect to  v0.9 
kafka but it’s actually v0.8 which clearly is very different wrt/ zookeeper 
etc. This is also backup up by trying the kafka-console-consumer.sh (and all 
the other tools) ask for mandatory zookeeper options which shouldn’t be 
necessary as far as I understand it for v0.9.

I am currently looking at 
https://github.com/mxm/incubator-beam/blob/63bce07d8c6cc5e610ad24e915e2585fef582567/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
 to see if I can use this code somehow to use Beam with Kafka v0.8. I am really 
hoping to as I have no option to upgrade currently and I really like the 
abstraction of Beam.

Thanks
Conrad


From: Mingmin Xu <mingm...@gmail.com<mailto:mingm...@gmail.com>>
Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Date: Thursday, 4 May 2017 at 17:59
To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Subject: Re: KafkaIO nothing received?

@Conrad,
Your code should be good to go, I can run it in my local env. There're two 
points you may have a check:
1). does the topic have data there, you can confirm with kafka cli 
'bin/kafka-console-consumer.sh';
2). is the port in bootstrapServers right? By default it's 9092.


On Thu, May 4, 2017 at 9:05 AM, Conrad Crampton 
<conrad.cramp...@secdata.com<mailto:conrad.cramp...@secdata.com>> wrote:
Hi,
New to the group – ‘hello’!

Just starting to look into Beam and I very much like the concepts, but have 
rather fallen at the first hurdle – that being trying to subscribe to a kafka 
topic and process results.
Very simply the following code doesn’t get receive any records (the data is 
going into the queue) – I just get nothing.
I have tried on both direct-runner and flink-runner (using the Quickstart as a 
base for options, mvn profile etc.)

Code

Pipeline p = Pipeline.create(options);

List<String> topics = ImmutableList.of("test-http-logs-json");


PCollection<String> logs = p.apply(KafkaIO.<String, String>read()
        
.withBootstrapServers("datanode2-cm1.mis-cds.local:6667,datanode3-cm1.mis-cds.local:6667,datanode6-cm1.mis-cds.local:6667")
        .withTopics(topics)
        .withKeyCoder(StringUtf8Coder.of())
        .withValueCoder(StringUtf8Coder.of())
        .withMaxNumRecords(10)
        .updateConsumerProperties(ImmutableMap.<String, Object>builder()
                .put("auto.offset.reset", (Object) "earliest")
                .put("group.id<http://group.id>", (Object) 
"http-logs-beam-json")
                .put("enable.auto.commit", (Object) "true")
                .put("receive.buffer.bytes", 1024 * 1024)
                .build())
        // set a Coder for Key and Value
        .withoutMetadata())
        .apply("Transform ", MapElements.via(new SimpleFunction<KV<String, 
String>, String>() {
            @Override
            public String apply(KV<String, String> input) {
                log.debug("{}", input.getValue());
                return input.getKey() + " " + input.getValue();
            }
        }));


p.run();


Result:
May 04, 2017 5:02:13 PM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ConsumerConfig values:
        metric.reporters = []
        metadata.max.age.ms<http://metadata.max.age.ms> = 300000
        value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
        group.id<http://group.id> = http-logs-beam-json
        partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]
        reconnect.backoff.ms<http://reconnect.backoff.ms> = 50
        sasl.kerberos.ticket.renew.window.factor = 0.8
        max.partition.fetch.bytes = 1048576
        bootstrap.servers = [datanode2-cm1.mis-cds.local:6667, 
datanode3-cm1.mis-cds.local:6667, datanode6-cm1.mis-cds.local:6667]
        retry.backoff.ms<http://retry.backoff.ms> = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.service.name<http://sasl.kerberos.service.name> = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.keystore.type = JKS
        ssl.trustmanager.algorithm = PKIX
        enable.auto.commit = true
        ssl.key.password = null
        fetch.max.wait.ms<http://fetch.max.wait.ms> = 500
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms<http://connections.max.idle.ms> = 540000
        ssl.truststore.password = null
        session.timeout.ms<http://session.timeout.ms> = 30000
        metrics.num.samples = 2
        client.id<http://client.id> =
        ssl.endpoint.identification.algorithm = null
        key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
        ssl.protocol = TLS
        check.crcs = true
        request.timeout.ms<http://request.timeout.ms> = 40000
       ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.keystore.location = null
        heartbeat.interval.ms<http://heartbeat.interval.ms> = 3000
        auto.commit.interval.ms<http://auto.commit.interval.ms> = 5000
        receive.buffer.bytes = 1048576
        ssl.cipher.suites = null
        ssl.truststore.type = JKS
        security.protocol = PLAINTEXT
        ssl.truststore.location = null
        ssl.keystore.password = null
        ssl.keymanager.algorithm = SunX509
        metrics.sample.window.ms<http://metrics.sample.window.ms> = 30000
        fetch.min.bytes = 1
        send.buffer.bytes = 131072
        auto.offset.reset = earliest

May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo 
<init>
INFO: Kafka version : 0.9.0.1
May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo 
<init>
INFO: Kafka commitId : 23c69d62a0cabf06
May 04, 2017 5:02:13 PM 
org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource generateInitialSplits
INFO: Partitions assigned to split 0 (total 1): test-http-logs-json-0
May 04, 2017 5:02:13 PM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ConsumerConfig values:
        metric.reporters = []
        metadata.max.age.ms<http://metadata.max.age.ms> = 300000
        value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
        group.id<http://group.id> = http-logs-beam-json
        partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]
        reconnect.backoff.ms<http://reconnect.backoff.ms> = 50
        sasl.kerberos.ticket.renew.window.factor = 0.8
        max.partition.fetch.bytes = 1048576
        bootstrap.servers = [datanode2-cm1.mis-cds.local:6667, 
datanode3-cm1.mis-cds.local:6667, datanode6-cm1.mis-cds.local:6667]
        retry.backoff.ms<http://retry.backoff.ms> = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.service.name<http://sasl.kerberos.service.name> = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.keystore.type = JKS
        ssl.trustmanager.algorithm = PKIX
        enable.auto.commit = true
        ssl.key.password = null
        fetch.max.wait.ms<http://fetch.max.wait.ms> = 500
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms<http://connections.max.idle.ms> = 540000
        ssl.truststore.password = null
        session.timeout.ms<http://session.timeout.ms> = 30000
        metrics.num.samples = 2
        client.id<http://client.id> =
        ssl.endpoint.identification.algorithm = null
        key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
        ssl.protocol = TLS
        check.crcs = true
        request.timeout.ms<http://request.timeout.ms> = 40000
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.keystore.location = null
        heartbeat.interval.ms<http://heartbeat.interval.ms> = 3000
        auto.commit.interval.ms<http://auto.commit.interval.ms> = 5000
        receive.buffer.bytes = 1048576
        ssl.cipher.suites = null
        ssl.truststore.type = JKS
        security.protocol = PLAINTEXT
        ssl.truststore.location = null
        ssl.keystore.password = null
        ssl.keymanager.algorithm = SunX509
        metrics.sample.window.ms<http://metrics.sample.window.ms> = 30000
        fetch.min.bytes = 1
        send.buffer.bytes = 131072
        auto.offset.reset = earliest

May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo 
<init>
INFO: Kafka version : 0.9.0.1
May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo 
<init>
INFO: Kafka commitId : 23c69d62a0cabf06
May 04, 2017 5:02:14 PM 
org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader start
INFO: Reader-0: reading from test-http-logs-json-0 starting at offset 0
May 04, 2017 5:02:14 PM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ConsumerConfig values:
        metric.reporters = []
        metadata.max.age.ms<http://metadata.max.age.ms> = 300000
        value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
        group.id<http://group.id> = 
Reader-0_offset_consumer_1029147362_http-logs-beam-json
        partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]
        reconnect.backoff.ms<http://reconnect.backoff.ms> = 50
        sasl.kerberos.ticket.renew.window.factor = 0.8
        max.partition.fetch.bytes = 1048576
        bootstrap.servers = [datanode2-cm1.mis-cds.local:6667, 
datanode3-cm1.mis-cds.local:6667, datanode6-cm1.mis-cds.local:6667]
        retry.backoff.ms<http://retry.backoff.ms> = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.service.name<http://sasl.kerberos.service.name> = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.keystore.type = JKS
        ssl.trustmanager.algorithm = PKIX
        enable.auto.commit = false
        ssl.key.password = null
        fetch.max.wait.ms<http://fetch.max.wait.ms> = 500
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms<http://connections.max.idle.ms> = 540000
        ssl.truststore.password = null
        session.timeout.ms<http://session.timeout.ms> = 30000
        metrics.num.samples = 2
        client.id<http://client.id> =
        ssl.endpoint.identification.algorithm = null
        key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
        ssl.protocol = TLS
        check.crcs = true
        request.timeout.ms<http://request.timeout.ms> = 40000
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.keystore.location = null
        heartbeat.interval.ms<http://heartbeat.interval.ms> = 3000
        auto.commit.interval.ms<http://auto.commit.interval.ms> = 5000
        receive.buffer.bytes = 1048576
        ssl.cipher.suites = null
        ssl.truststore.type = JKS
        security.protocol = PLAINTEXT
        ssl.truststore.location = null
        ssl.keystore.password = null
        ssl.keymanager.algorithm = SunX509
        metrics.sample.window.ms<http://metrics.sample.window.ms> = 30000
        fetch.min.bytes = 1
        send.buffer.bytes = 131072
        auto.offset.reset = earliest

May 04, 2017 5:02:14 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo 
<init>
INFO: Kafka version : 0.9.0.1
May 04, 2017 5:02:14 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo 
<init>
INFO: Kafka commitId : 23c69d62a0cabf06


Any suggestions as I have been on this now for a over a day with various 
attempts but nothing comes through.
When connecting to a different topic (which I subscribe directly via Flink in 
another application and get data from), I can set the .put("auto.offset.reset", 
(Object) "earliest") to earliest and latest and see different values for the 
offset so kafka appears to be available/ visible etc.

Many thanks
Conrad

SecureData, combating cyber threats

________________________________

The information contained in this message or any of its attachments may be 
privileged and confidential and intended for the exclusive use of the intended 
recipient. If you are not the intended recipient any disclosure, reproduction, 
distribution or other dissemination or use of this communications is strictly 
prohibited. The views expressed in this email are those of the individual and 
not necessarily of SecureData Europe Ltd. Any prices quoted are only valid if 
followed up by a formal written quote.

SecureData Europe Limited. Registered in England & Wales 04365896. Registered 
Address: SecureData House, Hermitage Court, Hermitage Lane, Maidstone, Kent, 
ME16 9NT



--
----
Mingmin


***This email originated outside SecureData***

Click here<https://www.mailcontrol.com/sr/MZbqvYs5QwJvpeaetUwhCQ==> to report 
this email as spam.

Reply via email to