I have writen a KafkaIO module support kafka 0.8 . Code is here 
https://github.com/1haodian/beam/tree/kafkaio8/sdks/java/io/kafka8

发件人: Conrad Crampton [mailto:conrad.cramp...@secdata.com]
发送时间: 2017年5月11日 1:29
收件人: user@beam.apache.org
主题: Re: KafkaIO nothing received?

Yes, will do and thanks for the support and help


From: Raghu Angadi <rang...@google.com<mailto:rang...@google.com>>
Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Date: Wednesday, 10 May 2017 at 18:18
To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Subject: Re: KafkaIO nothing received?

Thanks for the confirmation Cornad. Please let us know how Beam works for you 
once Kafka is upgraded to 0.9+.

I wish the Kafka 0.9 consumer failed more explicitly, rather than just hanging.

Raghu.

On Wed, May 10, 2017 at 5:17 AM, Conrad Crampton 
<conrad.cramp...@secdata.com<mailto:conrad.cramp...@secdata.com>> wrote:
Hi Raghu,
I would like to be running 0.9 Kafka servers, but I can confirm that I am 
running 0.8.2 – off the HDP 2.3.0 stack. The confusion lies (I think) with the 
upgrade to HDP has a bug in the way Ambari reports the various versions of the 
stack – specifically Kafka as 0.9.0.1, so I have been trying to use equivalent 
client version. It is all very odd in that I am using a Nifi processor 
(PublishKafka) which is a v0.9 producer which appears to create messages for 
the topic, but can only use a 0.8 consumer to read from it. Now that I have 
cleared up the fact that I am in fact saddled with a 0.8.2 server I can move on 
an use the appropriate clients – alas however, not Beam for the moment (given 
it is 0.9+ compatible). I know we are 0.8.2 on the server as the version is 
within the jar names for the kafka jars i.e. kafka_2.10-0.8.2.2.3.0.0-2557.jar
So until I can get my HDP upgraded to include a later version of Kafka I am 
going to have to use Flink (or Spark etc.) for my current application, but once 
upgraded I will definitely come back to Beam as it appears to be a great 
product with terrific community support.
Thanks again,
Conrad

From: Raghu Angadi <rang...@google.com<mailto:rang...@google.com>>
Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Date: Wednesday, 10 May 2017 at 00:20

To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Subject: Re: KafkaIO nothing received?

Hi Conrad,

Kafka consumer in Beam is 0.9 or above. Almost certainly you are running a 0.9 
or newer servers. I don't think 0.9 new client can talk to old brokers (but 0.9 
brokers can talk to older clients). How did you confirm the server version? You 
can check the server log. But I might be mistaken about this incompatibility.

Can you post 'jstack' of the application when it is stuck (assuming you are 
running using DirectRunner)?

> Kafka 0.8 requires a zookeeper connect property to be set, but I can’t set 
> this using updateConsumerProperties as the value gets discarded.

KafkaIO does not place any restrictions on ConsumerConfig except for key and 
value deserializers. The message about discarding these would be from Kafka 
consumer itself. I think it ignores configuration settings that it does not 
know about and logs 
them<https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L721>.

Raghu.

On Mon, May 8, 2017 at 1:57 AM, Conrad Crampton 
<conrad.cramp...@secdata.com<mailto:conrad.cramp...@secdata.com>> wrote:
Hi Raghu,
Yeah, the job just waits and does nothing. It reports the correct offset (this 
changes when I use ‘earliest’ or ‘latest’), but nothing is received. There are 
definitely messages in the queue. I am using Beam 0.6.
With my other application using Flink, I am using the FlinkKafkaConsumer08 
libraries (and not the FlinkKafkaConsumer09)  as I am sure I had a similar 
problem then i.e. no errors reported and appears to work fine, but nothing 
actually received in the streaming job.
Kafka 0.8 requires a zookeeper connect property to be set, but I can’t set this 
using updateConsumerProperties as the value gets discarded.
Thanks
Conrad

From: Raghu Angadi <rang...@google.com<mailto:rang...@google.com>>
Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Date: Thursday, 4 May 2017 at 18:27

To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Subject: Re: KafkaIO nothing received?

Conrad,

It does not look like there is a version incompatibility. You would see errors 
during during initialization otherwise. Log line "INFO: Reader-0: reading from 
test-http-logs-json-0 starting at offset 0" says it Kafka consumer was able to 
connect to servers.

Does the the job just wait inside p.run()? What is the version of Beam you are 
using? If it is just waiting for records, please ensure the topic has messages 
(using kafka-console-consumer.sh etc). Alternately you can try reading from 
another topic you mentioned that worked fine.

Raghu.

On Thu, May 4, 2017 at 10:07 AM, Conrad Crampton 
<conrad.cramp...@secdata.com<mailto:conrad.cramp...@secdata.com>> wrote:
Hi,
Ok, good to know I’m not going totally mad.
I think I may have been running around in circles unnecessarily <grrr>
I am using kafka as part of an HDP installation (with Ambari). The Ambari 
interface is reporting my kafka version as 0.9.0.2.3 and indeed the output 
given previously give
INFO: Kafka version : 0.9.0.1 (which doesn’t make particular sense). So I have 
ssh’d onto the server and looked at the libs for kafka and they are 
kafka_2.10-0.8.2.2.3.0.0-2557.jar so I’m guessing something is not quite right. 
This is incredibly frustrating as it looks like I am trying to connect to  v0.9 
kafka but it’s actually v0.8 which clearly is very different wrt/ zookeeper 
etc. This is also backup up by trying the kafka-console-consumer.sh (and all 
the other tools) ask for mandatory zookeeper options which shouldn’t be 
necessary as far as I understand it for v0.9.

I am currently looking at 
https://github.com/mxm/incubator-beam/blob/63bce07d8c6cc5e610ad24e915e2585fef582567/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
 to see if I can use this code somehow to use Beam with Kafka v0.8. I am really 
hoping to as I have no option to upgrade currently and I really like the 
abstraction of Beam.

Thanks
Conrad


From: Mingmin Xu <mingm...@gmail.com<mailto:mingm...@gmail.com>>
Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Date: Thursday, 4 May 2017 at 17:59
To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Subject: Re: KafkaIO nothing received?

@Conrad,
Your code should be good to go, I can run it in my local env. There're two 
points you may have a check:
1). does the topic have data there, you can confirm with kafka cli 
'bin/kafka-console-consumer.sh';
2). is the port in bootstrapServers right? By default it's 9092.


On Thu, May 4, 2017 at 9:05 AM, Conrad Crampton 
<conrad.cramp...@secdata.com<mailto:conrad.cramp...@secdata.com>> wrote:
Hi,
New to the group – ‘hello’!

Just starting to look into Beam and I very much like the concepts, but have 
rather fallen at the first hurdle – that being trying to subscribe to a kafka 
topic and process results.
Very simply the following code doesn’t get receive any records (the data is 
going into the queue) – I just get nothing.
I have tried on both direct-runner and flink-runner (using the Quickstart as a 
base for options, mvn profile etc.)

Code

Pipeline p = Pipeline.create(options);

List<String> topics = ImmutableList.of("test-http-logs-json");


PCollection<String> logs = p.apply(KafkaIO.<String, String>read()
        
.withBootstrapServers("datanode2-cm1.mis-cds.local:6667,datanode3-cm1.mis-cds.local:6667,datanode6-cm1.mis-cds.local:6667")
        .withTopics(topics)
        .withKeyCoder(StringUtf8Coder.of())
        .withValueCoder(StringUtf8Coder.of())
        .withMaxNumRecords(10)
        .updateConsumerProperties(ImmutableMap.<String, Object>builder()
                .put("auto.offset.reset", (Object) "earliest")
                .put("group.id<http://group.id>", (Object) 
"http-logs-beam-json")
                .put("enable.auto.commit", (Object) "true")
                .put("receive.buffer.bytes", 1024 * 1024)
                .build())
        // set a Coder for Key and Value
        .withoutMetadata())
        .apply("Transform ", MapElements.via(new SimpleFunction<KV<String, 
String>, String>() {
            @Override
            public String apply(KV<String, String> input) {
                log.debug("{}", input.getValue());
                return input.getKey() + " " + input.getValue();
            }
        }));


p.run();


Result:
May 04, 2017 5:02:13 PM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ConsumerConfig values:
        metric.reporters = []
        metadata.max.age.ms<http://metadata.max.age.ms> = 300000
        value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
        group.id<http://group.id> = http-logs-beam-json
        partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]
        reconnect.backoff.ms<http://reconnect.backoff.ms> = 50
        sasl.kerberos.ticket.renew.window.factor = 0.8
        max.partition.fetch.bytes = 1048576
        bootstrap.servers = [datanode2-cm1.mis-cds.local:6667, 
datanode3-cm1.mis-cds.local:6667, datanode6-cm1.mis-cds.local:6667]
        retry.backoff.ms<http://retry.backoff.ms> = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.service.name<http://sasl.kerberos.service.name> = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.keystore.type = JKS
        ssl.trustmanager.algorithm = PKIX
        enable.auto.commit = true
        ssl.key.password = null
        fetch.max.wait.ms<http://fetch.max.wait.ms> = 500
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms<http://connections.max.idle.ms> = 540000
        ssl.truststore.password = null
        session.timeout.ms<http://session.timeout.ms> = 30000
        metrics.num.samples = 2
        client.id<http://client.id> =
        ssl.endpoint.identification.algorithm = null
        key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
        ssl.protocol = TLS
        check.crcs = true
        request.timeout.ms<http://request.timeout.ms> = 40000
       ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.keystore.location = null
        heartbeat.interval.ms<http://heartbeat.interval.ms> = 3000
        auto.commit.interval.ms<http://auto.commit.interval.ms> = 5000
        receive.buffer.bytes = 1048576
        ssl.cipher.suites = null
        ssl.truststore.type = JKS
        security.protocol = PLAINTEXT
        ssl.truststore.location = null
        ssl.keystore.password = null
        ssl.keymanager.algorithm = SunX509
        metrics.sample.window.ms<http://metrics.sample.window.ms> = 30000
        fetch.min.bytes = 1
        send.buffer.bytes = 131072
        auto.offset.reset = earliest

May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo 
<init>
INFO: Kafka version : 0.9.0.1
May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo 
<init>
INFO: Kafka commitId : 23c69d62a0cabf06
May 04, 2017 5:02:13 PM 
org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource generateInitialSplits
INFO: Partitions assigned to split 0 (total 1): test-http-logs-json-0
May 04, 2017 5:02:13 PM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ConsumerConfig values:
        metric.reporters = []
        metadata.max.age.ms<http://metadata.max.age.ms> = 300000
        value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
        group.id<http://group.id> = http-logs-beam-json
        partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]
        reconnect.backoff.ms<http://reconnect.backoff.ms> = 50
        sasl.kerberos.ticket.renew.window.factor = 0.8
        max.partition.fetch.bytes = 1048576
        bootstrap.servers = [datanode2-cm1.mis-cds.local:6667, 
datanode3-cm1.mis-cds.local:6667, datanode6-cm1.mis-cds.local:6667]
        retry.backoff.ms<http://retry.backoff.ms> = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.service.name<http://sasl.kerberos.service.name> = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.keystore.type = JKS
        ssl.trustmanager.algorithm = PKIX
        enable.auto.commit = true
        ssl.key.password = null
        fetch.max.wait.ms<http://fetch.max.wait.ms> = 500
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms<http://connections.max.idle.ms> = 540000
        ssl.truststore.password = null
        session.timeout.ms<http://session.timeout.ms> = 30000
        metrics.num.samples = 2
        client.id<http://client.id> =
        ssl.endpoint.identification.algorithm = null
        key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
        ssl.protocol = TLS
        check.crcs = true
        request.timeout.ms<http://request.timeout.ms> = 40000
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.keystore.location = null
        heartbeat.interval.ms<http://heartbeat.interval.ms> = 3000
        auto.commit.interval.ms<http://auto.commit.interval.ms> = 5000
        receive.buffer.bytes = 1048576
        ssl.cipher.suites = null
        ssl.truststore.type = JKS
        security.protocol = PLAINTEXT
        ssl.truststore.location = null
        ssl.keystore.password = null
        ssl.keymanager.algorithm = SunX509
        metrics.sample.window.ms<http://metrics.sample.window.ms> = 30000
        fetch.min.bytes = 1
        send.buffer.bytes = 131072
        auto.offset.reset = earliest

May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo 
<init>
INFO: Kafka version : 0.9.0.1
May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo 
<init>
INFO: Kafka commitId : 23c69d62a0cabf06
May 04, 2017 5:02:14 PM 
org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader start
INFO: Reader-0: reading from test-http-logs-json-0 starting at offset 0
May 04, 2017 5:02:14 PM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ConsumerConfig values:
        metric.reporters = []
        metadata.max.age.ms<http://metadata.max.age.ms> = 300000
        value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
        group.id<http://group.id> = 
Reader-0_offset_consumer_1029147362_http-logs-beam-json
        partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]
        reconnect.backoff.ms<http://reconnect.backoff.ms> = 50
        sasl.kerberos.ticket.renew.window.factor = 0.8
        max.partition.fetch.bytes = 1048576
        bootstrap.servers = [datanode2-cm1.mis-cds.local:6667, 
datanode3-cm1.mis-cds.local:6667, datanode6-cm1.mis-cds.local:6667]
        retry.backoff.ms<http://retry.backoff.ms> = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.service.name<http://sasl.kerberos.service.name> = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.keystore.type = JKS
        ssl.trustmanager.algorithm = PKIX
        enable.auto.commit = false
        ssl.key.password = null
        fetch.max.wait.ms<http://fetch.max.wait.ms> = 500
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms<http://connections.max.idle.ms> = 540000
        ssl.truststore.password = null
        session.timeout.ms<http://session.timeout.ms> = 30000
        metrics.num.samples = 2
        client.id<http://client.id> =
        ssl.endpoint.identification.algorithm = null
        key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
        ssl.protocol = TLS
        check.crcs = true
        request.timeout.ms<http://request.timeout.ms> = 40000
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.keystore.location = null
        heartbeat.interval.ms<http://heartbeat.interval.ms> = 3000
        auto.commit.interval.ms<http://auto.commit.interval.ms> = 5000
        receive.buffer.bytes = 1048576
        ssl.cipher.suites = null
        ssl.truststore.type = JKS
        security.protocol = PLAINTEXT
        ssl.truststore.location = null
        ssl.keystore.password = null
        ssl.keymanager.algorithm = SunX509
        metrics.sample.window.ms<http://metrics.sample.window.ms> = 30000
        fetch.min.bytes = 1
        send.buffer.bytes = 131072
        auto.offset.reset = earliest

May 04, 2017 5:02:14 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo 
<init>
INFO: Kafka version : 0.9.0.1
May 04, 2017 5:02:14 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo 
<init>
INFO: Kafka commitId : 23c69d62a0cabf06


Any suggestions as I have been on this now for a over a day with various 
attempts but nothing comes through.
When connecting to a different topic (which I subscribe directly via Flink in 
another application and get data from), I can set the .put("auto.offset.reset", 
(Object) "earliest") to earliest and latest and see different values for the 
offset so kafka appears to be available/ visible etc.

Many thanks
Conrad

SecureData, combating cyber threats

________________________________

The information contained in this message or any of its attachments may be 
privileged and confidential and intended for the exclusive use of the intended 
recipient. If you are not the intended recipient any disclosure, reproduction, 
distribution or other dissemination or use of this communications is strictly 
prohibited. The views expressed in this email are those of the individual and 
not necessarily of SecureData Europe Ltd. Any prices quoted are only valid if 
followed up by a formal written quote.

SecureData Europe Limited. Registered in England & Wales 04365896. Registered 
Address: SecureData House, Hermitage Court, Hermitage Lane, Maidstone, Kent, 
ME16 9NT



--
----
Mingmin


***This email originated outside SecureData***

Click here<https://www.mailcontrol.com/sr/MZbqvYs5QwJvpeaetUwhCQ==> to report 
this email as spam.



Reply via email to