Hi Conrad,

Kafka consumer in Beam is 0.9 or above. Almost certainly you are running a
0.9 or newer servers. I don't think 0.9 new client can talk to old brokers
(but 0.9 brokers can talk to older clients). How did you confirm the server
version? You can check the server log. But I might be mistaken about this
incompatibility.

Can you post 'jstack' of the application when it is stuck (assuming you are
running using DirectRunner)?

> Kafka 0.8 requires a zookeeper connect property to be set, but I can’t
set this using updateConsumerProperties as the value gets discarded.

KafkaIO does not place any restrictions on ConsumerConfig except for key
and value deserializers. The message about discarding these would be from
Kafka consumer itself. I think it ignores configuration settings that it
does not know about and logs them
<https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L721>
.

Raghu.

On Mon, May 8, 2017 at 1:57 AM, Conrad Crampton <conrad.cramp...@secdata.com
> wrote:

> Hi Raghu,
>
> Yeah, the job just waits and does nothing. It reports the correct offset
> (this changes when I use ‘earliest’ or ‘latest’), but nothing is received.
> There are definitely messages in the queue. I am using Beam 0.6.
>
> With my other application using Flink, I am using the FlinkKafkaConsumer08
> libraries (and not the FlinkKafkaConsumer09)  as I am sure I had a similar
> problem then i.e. no errors reported and appears to work fine, but nothing
> actually received in the streaming job.
>
> Kafka 0.8 requires a zookeeper connect property to be set, but I can’t set
> this using updateConsumerProperties as the value gets discarded.
>
> Thanks
>
> Conrad
>
>
>
> *From: *Raghu Angadi <rang...@google.com>
> *Reply-To: *"user@beam.apache.org" <user@beam.apache.org>
> *Date: *Thursday, 4 May 2017 at 18:27
>
> *To: *"user@beam.apache.org" <user@beam.apache.org>
> *Subject: *Re: KafkaIO nothing received?
>
>
>
> Conrad,
>
>
>
> It does not look like there is a version incompatibility. You would see
> errors during during initialization otherwise. Log line "INFO: Reader-0:
> reading from test-http-logs-json-0 starting at offset 0" says it Kafka
> consumer was able to connect to servers.
>
>
>
> Does the the job just wait inside p.run()? What is the version of Beam you
> are using? If it is just waiting for records, please ensure the topic has
> messages (using kafka-console-consumer.sh etc). Alternately you can try
> reading from another topic you mentioned that worked fine.
>
>
>
> Raghu.
>
>
>
> On Thu, May 4, 2017 at 10:07 AM, Conrad Crampton <
> conrad.cramp...@secdata.com> wrote:
>
> Hi,
>
> Ok, good to know I’m not going totally mad.
>
> I think I may have been running around in circles unnecessarily <grrr>
>
> I am using kafka as part of an HDP installation (with Ambari). The Ambari
> interface is reporting my kafka version as 0.9.0.2.3 and indeed the output
> given previously give
>
> INFO: Kafka version : 0.9.0.1 (which doesn’t make particular sense). So I
> have ssh’d onto the server and looked at the libs for kafka and they are
> kafka_2.10-0.8.2.2.3.0.0-2557.jar so I’m guessing something is not quite
> right. This is incredibly frustrating as it looks like I am trying to
> connect to  v0.9 kafka but it’s actually v0.8 which clearly is very
> different wrt/ zookeeper etc. This is also backup up by trying the
> kafka-console-consumer.sh (and all the other tools) ask for mandatory
> zookeeper options which shouldn’t be necessary as far as I understand it
> for v0.9.
>
>
>
> I am currently looking at https://github.com/mxm/incubator-beam/blob/
> 63bce07d8c6cc5e610ad24e915e2585fef582567/runners/flink/
> examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/
> KafkaIOExamples.java to see if I can use this code somehow to use Beam
> with Kafka v0.8. I am really hoping to as I have no option to upgrade
> currently and I really like the abstraction of Beam.
>
>
>
> Thanks
>
> Conrad
>
>
>
>
>
> *From: *Mingmin Xu <mingm...@gmail.com>
> *Reply-To: *"user@beam.apache.org" <user@beam.apache.org>
> *Date: *Thursday, 4 May 2017 at 17:59
> *To: *"user@beam.apache.org" <user@beam.apache.org>
> *Subject: *Re: KafkaIO nothing received?
>
>
>
> @Conrad,
>
> Your code should be good to go, I can run it in my local env. There're two
> points you may have a check:
>
> 1). does the topic have data there, you can confirm with kafka cli '
> *bin/kafka-console-consumer.sh*';
>
> 2). is the port in bootstrapServers right? By default it's 9092.
>
>
>
>
>
> On Thu, May 4, 2017 at 9:05 AM, Conrad Crampton <
> conrad.cramp...@secdata.com> wrote:
>
> Hi,
>
> New to the group – ‘hello’!
>
>
>
> Just starting to look into Beam and I very much like the concepts, but
> have rather fallen at the first hurdle – that being trying to subscribe to
> a kafka topic and process results.
>
> Very simply the following code doesn’t get receive any records (the data
> is going into the queue) – I just get nothing.
>
> I have tried on both direct-runner and flink-runner (using the Quickstart
> as a base for options, mvn profile etc.)
>
>
>
> Code
>
>
>
> Pipeline p = Pipeline.*create*(options);
>
> List<String> topics = ImmutableList.*of*(*"test-http-logs-json"*);
>
>
> PCollection<String> logs = p.apply(KafkaIO.<String, String>*read*()
>         .withBootstrapServers(
> *"datanode2-cm1.mis-cds.local:6667,datanode3-cm1.mis-cds.local:6667,datanode6-cm1.mis-cds.local:6667"*
> )
>         .withTopics(topics)
>         .withKeyCoder(StringUtf8Coder.*of*())
>         .withValueCoder(StringUtf8Coder.*of*())
>         .withMaxNumRecords(10)
>         .updateConsumerProperties(ImmutableMap.<String, Object>*builder*()
>                 .put(*"auto.offset.reset"*, (Object) *"earliest"*)
>                 .put(*"group.id <http://group.id>"*, (Object)
> *"http-logs-beam-json"*)
>                 .put(*"enable.auto.commit"*, (Object) *"true"*)
>                 .put(*"receive.buffer.bytes"*, 1024 * 1024)
>                 .build())
>
> *// set a Coder for Key and Value         *.withoutMetadata())
>         .apply(*"Transform "*, MapElements.*via*(*new 
> *SimpleFunction<KV<String,
> String>, String>() {
>             @Override
>             *public *String apply(KV<String, String> input) {
>                 *log*.debug(*"{}"*, input.getValue());
>                 *return *input.getKey() + *" " *+ input.getValue();
>             }
>         }));
>
>
> p.run();
>
>
>
>
>
> Result:
>
> May 04, 2017 5:02:13 PM org.apache.kafka.common.config.AbstractConfig
> logAll
>
> INFO: ConsumerConfig values:
>
>         metric.reporters = []
>
>         metadata.max.age.ms = 300000
>
>         value.deserializer = class org.apache.kafka.common.serialization.
> ByteArrayDeserializer
>
>         group.id = http-logs-beam-json
>
>         partition.assignment.strategy = [org.apache.kafka.clients.
> consumer.RangeAssignor]
>
>         reconnect.backoff.ms = 50
>
>         sasl.kerberos.ticket.renew.window.factor = 0.8
>
>         max.partition.fetch.bytes = 1048576
>
>         bootstrap.servers = [datanode2-cm1.mis-cds.local:6667,
> datanode3-cm1.mis-cds.local:6667, datanode6-cm1.mis-cds.local:6667]
>
>         retry.backoff.ms = 100
>
>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>
>         sasl.kerberos.service.name = null
>
>         sasl.kerberos.ticket.renew.jitter = 0.05
>
>         ssl.keystore.type = JKS
>
>         ssl.trustmanager.algorithm = PKIX
>
>         enable.auto.commit = true
>
>         ssl.key.password = null
>
>         fetch.max.wait.ms = 500
>
>         sasl.kerberos.min.time.before.relogin = 60000
>
>         connections.max.idle.ms = 540000
>
>         ssl.truststore.password = null
>
>         session.timeout.ms = 30000
>
>         metrics.num.samples = 2
>
>         client.id =
>
>         ssl.endpoint.identification.algorithm = null
>
>         key.deserializer = class org.apache.kafka.common.serialization.
> ByteArrayDeserializer
>
>         ssl.protocol = TLS
>
>         check.crcs = true
>
>         request.timeout.ms = 40000
>
>        ssl.provider = null
>
>         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>
>         ssl.keystore.location = null
>
>         heartbeat.interval.ms = 3000
>
>         auto.commit.interval.ms = 5000
>
>         receive.buffer.bytes = 1048576
>
>         ssl.cipher.suites = null
>
>         ssl.truststore.type = JKS
>
>         security.protocol = PLAINTEXT
>
>         ssl.truststore.location = null
>
>         ssl.keystore.password = null
>
>         ssl.keymanager.algorithm = SunX509
>
>         metrics.sample.window.ms = 30000
>
>         fetch.min.bytes = 1
>
>         send.buffer.bytes = 131072
>
>         auto.offset.reset = earliest
>
>
>
> May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo
> <init>
>
> INFO: Kafka version : 0.9.0.1
>
> May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo
> <init>
>
> INFO: Kafka commitId : 23c69d62a0cabf06
>
> May 04, 2017 5:02:13 PM 
> org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource
> generateInitialSplits
>
> INFO: Partitions assigned to split 0 (total 1): test-http-logs-json-0
>
> May 04, 2017 5:02:13 PM org.apache.kafka.common.config.AbstractConfig
> logAll
>
> INFO: ConsumerConfig values:
>
>         metric.reporters = []
>
>         metadata.max.age.ms = 300000
>
>         value.deserializer = class org.apache.kafka.common.serialization.
> ByteArrayDeserializer
>
>         group.id = http-logs-beam-json
>
>         partition.assignment.strategy = [org.apache.kafka.clients.
> consumer.RangeAssignor]
>
>         reconnect.backoff.ms = 50
>
>         sasl.kerberos.ticket.renew.window.factor = 0.8
>
>         max.partition.fetch.bytes = 1048576
>
>         bootstrap.servers = [datanode2-cm1.mis-cds.local:6667,
> datanode3-cm1.mis-cds.local:6667, datanode6-cm1.mis-cds.local:6667]
>
>         retry.backoff.ms = 100
>
>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>
>         sasl.kerberos.service.name = null
>
>         sasl.kerberos.ticket.renew.jitter = 0.05
>
>         ssl.keystore.type = JKS
>
>         ssl.trustmanager.algorithm = PKIX
>
>         enable.auto.commit = true
>
>         ssl.key.password = null
>
>         fetch.max.wait.ms = 500
>
>         sasl.kerberos.min.time.before.relogin = 60000
>
>         connections.max.idle.ms = 540000
>
>         ssl.truststore.password = null
>
>         session.timeout.ms = 30000
>
>         metrics.num.samples = 2
>
>         client.id =
>
>         ssl.endpoint.identification.algorithm = null
>
>         key.deserializer = class org.apache.kafka.common.serialization.
> ByteArrayDeserializer
>
>         ssl.protocol = TLS
>
>         check.crcs = true
>
>         request.timeout.ms = 40000
>
>         ssl.provider = null
>
>         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>
>         ssl.keystore.location = null
>
>         heartbeat.interval.ms = 3000
>
>         auto.commit.interval.ms = 5000
>
>         receive.buffer.bytes = 1048576
>
>         ssl.cipher.suites = null
>
>         ssl.truststore.type = JKS
>
>         security.protocol = PLAINTEXT
>
>         ssl.truststore.location = null
>
>         ssl.keystore.password = null
>
>         ssl.keymanager.algorithm = SunX509
>
>         metrics.sample.window.ms = 30000
>
>         fetch.min.bytes = 1
>
>         send.buffer.bytes = 131072
>
>         auto.offset.reset = earliest
>
>
>
> May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo
> <init>
>
> INFO: Kafka version : 0.9.0.1
>
> May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo
> <init>
>
> INFO: Kafka commitId : 23c69d62a0cabf06
>
> May 04, 2017 5:02:14 PM 
> org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader
> start
>
> INFO: Reader-0: reading from test-http-logs-json-0 starting at offset 0
>
> May 04, 2017 5:02:14 PM org.apache.kafka.common.config.AbstractConfig
> logAll
>
> INFO: ConsumerConfig values:
>
>         metric.reporters = []
>
>         metadata.max.age.ms = 300000
>
>         value.deserializer = class org.apache.kafka.common.serialization.
> ByteArrayDeserializer
>
>         group.id = Reader-0_offset_consumer_1029147362_http-logs-beam-json
>
>         partition.assignment.strategy = [org.apache.kafka.clients.
> consumer.RangeAssignor]
>
>         reconnect.backoff.ms = 50
>
>         sasl.kerberos.ticket.renew.window.factor = 0.8
>
>         max.partition.fetch.bytes = 1048576
>
>         bootstrap.servers = [datanode2-cm1.mis-cds.local:6667,
> datanode3-cm1.mis-cds.local:6667, datanode6-cm1.mis-cds.local:6667]
>
>         retry.backoff.ms = 100
>
>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>
>         sasl.kerberos.service.name = null
>
>         sasl.kerberos.ticket.renew.jitter = 0.05
>
>         ssl.keystore.type = JKS
>
>         ssl.trustmanager.algorithm = PKIX
>
>         enable.auto.commit = false
>
>         ssl.key.password = null
>
>         fetch.max.wait.ms = 500
>
>         sasl.kerberos.min.time.before.relogin = 60000
>
>         connections.max.idle.ms = 540000
>
>         ssl.truststore.password = null
>
>         session.timeout.ms = 30000
>
>         metrics.num.samples = 2
>
>         client.id =
>
>         ssl.endpoint.identification.algorithm = null
>
>         key.deserializer = class org.apache.kafka.common.serialization.
> ByteArrayDeserializer
>
>         ssl.protocol = TLS
>
>         check.crcs = true
>
>         request.timeout.ms = 40000
>
>         ssl.provider = null
>
>         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>
>         ssl.keystore.location = null
>
>         heartbeat.interval.ms = 3000
>
>         auto.commit.interval.ms = 5000
>
>         receive.buffer.bytes = 1048576
>
>         ssl.cipher.suites = null
>
>         ssl.truststore.type = JKS
>
>         security.protocol = PLAINTEXT
>
>         ssl.truststore.location = null
>
>         ssl.keystore.password = null
>
>         ssl.keymanager.algorithm = SunX509
>
>         metrics.sample.window.ms = 30000
>
>         fetch.min.bytes = 1
>
>         send.buffer.bytes = 131072
>
>         auto.offset.reset = earliest
>
>
>
> May 04, 2017 5:02:14 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo
> <init>
>
> INFO: Kafka version : 0.9.0.1
>
> May 04, 2017 5:02:14 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo
> <init>
>
> INFO: Kafka commitId : 23c69d62a0cabf06
>
>
>
>
>
> Any suggestions as I have been on this now for a over a day with various
> attempts but nothing comes through.
>
> When connecting to a different topic (which I subscribe directly via Flink
> in another application and get data from), I can set the .put(
> *"auto.offset.reset"*, (Object) * "earliest"*) to earliest and latest and
> see different values for the offset so kafka appears to be available/
> visible etc.
>
>
>
> Many thanks
>
> Conrad
>
>
>
> SecureData, combating cyber threats
>
>
> ------------------------------
>
> The information contained in this message or any of its attachments may be
> privileged and confidential and intended for the exclusive use of the
> intended recipient. If you are not the intended recipient any disclosure,
> reproduction, distribution or other dissemination or use of this
> communications is strictly prohibited. The views expressed in this email
> are those of the individual and not necessarily of SecureData Europe Ltd.
> Any prices quoted are only valid if followed up by a formal written quote.
>
> SecureData Europe Limited. Registered in England & Wales 04365896.
> Registered Address: SecureData House, Hermitage Court, Hermitage Lane,
> Maidstone, Kent, ME16 9NT
>
>
>
>
> --
>
> ----
>
> Mingmin
>
>
>
> ***This email originated outside SecureData***
>
> Click here <https://www.mailcontrol.com/sr/MZbqvYs5QwJvpeaetUwhCQ==> to
> report this email as spam.
>
>
>

Reply via email to