Thanks for the confirmation Cornad. Please let us know how Beam works for you once Kafka is upgraded to 0.9+.
I wish the Kafka 0.9 consumer failed more explicitly, rather than just hanging. Raghu. On Wed, May 10, 2017 at 5:17 AM, Conrad Crampton < conrad.cramp...@secdata.com> wrote: > Hi Raghu, > > I would like to be running 0.9 Kafka servers, but I can confirm that I am > running 0.8.2 – off the HDP 2.3.0 stack. The confusion lies (I think) with > the upgrade to HDP has a bug in the way Ambari reports the various versions > of the stack – specifically Kafka as 0.9.0.1, so I have been trying to use > equivalent client version. It is all very odd in that I am using a Nifi > processor (PublishKafka) which is a v0.9 producer which appears to create > messages for the topic, but can only use a 0.8 consumer to read from it. > Now that I have cleared up the fact that I am in fact saddled with a 0.8.2 > server I can move on an use the appropriate clients – alas however, not > Beam for the moment (given it is 0.9+ compatible). I know we are 0.8.2 on > the server as the version is within the jar names for the kafka jars i.e. > kafka_2.10-0.8.2.2.3.0.0-2557.jar > > So until I can get my HDP upgraded to include a later version of Kafka I > am going to have to use Flink (or Spark etc.) for my current application, > but once upgraded I will definitely come back to Beam as it appears to be a > great product with terrific community support. > > Thanks again, > > Conrad > > > > *From: *Raghu Angadi <rang...@google.com> > *Reply-To: *"user@beam.apache.org" <user@beam.apache.org> > *Date: *Wednesday, 10 May 2017 at 00:20 > > *To: *"user@beam.apache.org" <user@beam.apache.org> > *Subject: *Re: KafkaIO nothing received? > > > > Hi Conrad, > > > > Kafka consumer in Beam is 0.9 or above. Almost certainly you are running a > 0.9 or newer servers. I don't think 0.9 new client can talk to old brokers > (but 0.9 brokers can talk to older clients). How did you confirm the server > version? You can check the server log. But I might be mistaken about this > incompatibility. > > > > Can you post 'jstack' of the application when it is stuck (assuming you > are running using DirectRunner)? > > > > > Kafka 0.8 requires a zookeeper connect property to be set, but I can’t > set this using updateConsumerProperties as the value gets discarded. > > > > KafkaIO does not place any restrictions on ConsumerConfig except for key > and value deserializers. The message about discarding these would be from > Kafka consumer itself. I think it ignores configuration settings that it > does not know about and logs them > <https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L721> > . > > > > Raghu. > > > > On Mon, May 8, 2017 at 1:57 AM, Conrad Crampton < > conrad.cramp...@secdata.com> wrote: > > Hi Raghu, > > Yeah, the job just waits and does nothing. It reports the correct offset > (this changes when I use ‘earliest’ or ‘latest’), but nothing is received. > There are definitely messages in the queue. I am using Beam 0.6. > > With my other application using Flink, I am using the FlinkKafkaConsumer08 > libraries (and not the FlinkKafkaConsumer09) as I am sure I had a similar > problem then i.e. no errors reported and appears to work fine, but nothing > actually received in the streaming job. > > Kafka 0.8 requires a zookeeper connect property to be set, but I can’t set > this using updateConsumerProperties as the value gets discarded. > > Thanks > > Conrad > > > > *From: *Raghu Angadi <rang...@google.com> > *Reply-To: *"user@beam.apache.org" <user@beam.apache.org> > *Date: *Thursday, 4 May 2017 at 18:27 > > > *To: *"user@beam.apache.org" <user@beam.apache.org> > *Subject: *Re: KafkaIO nothing received? > > > > Conrad, > > > > It does not look like there is a version incompatibility. You would see > errors during during initialization otherwise. Log line "INFO: Reader-0: > reading from test-http-logs-json-0 starting at offset 0" says it Kafka > consumer was able to connect to servers. > > > > Does the the job just wait inside p.run()? What is the version of Beam you > are using? If it is just waiting for records, please ensure the topic has > messages (using kafka-console-consumer.sh etc). Alternately you can try > reading from another topic you mentioned that worked fine. > > > > Raghu. > > > > On Thu, May 4, 2017 at 10:07 AM, Conrad Crampton < > conrad.cramp...@secdata.com> wrote: > > Hi, > > Ok, good to know I’m not going totally mad. > > I think I may have been running around in circles unnecessarily <grrr> > > I am using kafka as part of an HDP installation (with Ambari). The Ambari > interface is reporting my kafka version as 0.9.0.2.3 and indeed the output > given previously give > > INFO: Kafka version : 0.9.0.1 (which doesn’t make particular sense). So I > have ssh’d onto the server and looked at the libs for kafka and they are > kafka_2.10-0.8.2.2.3.0.0-2557.jar so I’m guessing something is not quite > right. This is incredibly frustrating as it looks like I am trying to > connect to v0.9 kafka but it’s actually v0.8 which clearly is very > different wrt/ zookeeper etc. This is also backup up by trying the > kafka-console-consumer.sh (and all the other tools) ask for mandatory > zookeeper options which shouldn’t be necessary as far as I understand it > for v0.9. > > > > I am currently looking at https://github.com/mxm/incubator-beam/blob/ > 63bce07d8c6cc5e610ad24e915e2585fef582567/runners/flink/ > examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/ > KafkaIOExamples.java to see if I can use this code somehow to use Beam > with Kafka v0.8. I am really hoping to as I have no option to upgrade > currently and I really like the abstraction of Beam. > > > > Thanks > > Conrad > > > > > > *From: *Mingmin Xu <mingm...@gmail.com> > *Reply-To: *"user@beam.apache.org" <user@beam.apache.org> > *Date: *Thursday, 4 May 2017 at 17:59 > *To: *"user@beam.apache.org" <user@beam.apache.org> > *Subject: *Re: KafkaIO nothing received? > > > > @Conrad, > > Your code should be good to go, I can run it in my local env. There're two > points you may have a check: > > 1). does the topic have data there, you can confirm with kafka cli ' > *bin/kafka-console-consumer.sh*'; > > 2). is the port in bootstrapServers right? By default it's 9092. > > > > > > On Thu, May 4, 2017 at 9:05 AM, Conrad Crampton < > conrad.cramp...@secdata.com> wrote: > > Hi, > > New to the group – ‘hello’! > > > > Just starting to look into Beam and I very much like the concepts, but > have rather fallen at the first hurdle – that being trying to subscribe to > a kafka topic and process results. > > Very simply the following code doesn’t get receive any records (the data > is going into the queue) – I just get nothing. > > I have tried on both direct-runner and flink-runner (using the Quickstart > as a base for options, mvn profile etc.) > > > > Code > > > > Pipeline p = Pipeline.*create*(options); > > List<String> topics = ImmutableList.*of*(*"test-http-logs-json"*); > > > PCollection<String> logs = p.apply(KafkaIO.<String, String>*read*() > .withBootstrapServers( > *"datanode2-cm1.mis-cds.local:6667,datanode3-cm1.mis-cds.local:6667,datanode6-cm1.mis-cds.local:6667"* > ) > .withTopics(topics) > .withKeyCoder(StringUtf8Coder.*of*()) > .withValueCoder(StringUtf8Coder.*of*()) > .withMaxNumRecords(10) > .updateConsumerProperties(ImmutableMap.<String, Object>*builder*() > .put(*"auto.offset.reset"*, (Object) *"earliest"*) > .put(*"group.id <http://group.id>"*, (Object) > *"http-logs-beam-json"*) > .put(*"enable.auto.commit"*, (Object) *"true"*) > .put(*"receive.buffer.bytes"*, 1024 * 1024) > .build()) > > *// set a Coder for Key and Value *.withoutMetadata()) > .apply(*"Transform "*, MapElements.*via*(*new > *SimpleFunction<KV<String, > String>, String>() { > @Override > *public *String apply(KV<String, String> input) { > *log*.debug(*"{}"*, input.getValue()); > *return *input.getKey() + *" " *+ input.getValue(); > } > })); > > > p.run(); > > > > > > Result: > > May 04, 2017 5:02:13 PM org.apache.kafka.common.config.AbstractConfig > logAll > > INFO: ConsumerConfig values: > > metric.reporters = [] > > metadata.max.age.ms = 300000 > > value.deserializer = class org.apache.kafka.common.serialization. > ByteArrayDeserializer > > group.id = http-logs-beam-json > > partition.assignment.strategy = [org.apache.kafka.clients. > consumer.RangeAssignor] > > reconnect.backoff.ms = 50 > > sasl.kerberos.ticket.renew.window.factor = 0.8 > > max.partition.fetch.bytes = 1048576 > > bootstrap.servers = [datanode2-cm1.mis-cds.local:6667, > datanode3-cm1.mis-cds.local:6667, datanode6-cm1.mis-cds.local:6667] > > retry.backoff.ms = 100 > > sasl.kerberos.kinit.cmd = /usr/bin/kinit > > sasl.kerberos.service.name = null > > sasl.kerberos.ticket.renew.jitter = 0.05 > > ssl.keystore.type = JKS > > ssl.trustmanager.algorithm = PKIX > > enable.auto.commit = true > > ssl.key.password = null > > fetch.max.wait.ms = 500 > > sasl.kerberos.min.time.before.relogin = 60000 > > connections.max.idle.ms = 540000 > > ssl.truststore.password = null > > session.timeout.ms = 30000 > > metrics.num.samples = 2 > > client.id = > > ssl.endpoint.identification.algorithm = null > > key.deserializer = class org.apache.kafka.common.serialization. > ByteArrayDeserializer > > ssl.protocol = TLS > > check.crcs = true > > request.timeout.ms = 40000 > > ssl.provider = null > > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > > ssl.keystore.location = null > > heartbeat.interval.ms = 3000 > > auto.commit.interval.ms = 5000 > > receive.buffer.bytes = 1048576 > > ssl.cipher.suites = null > > ssl.truststore.type = JKS > > security.protocol = PLAINTEXT > > ssl.truststore.location = null > > ssl.keystore.password = null > > ssl.keymanager.algorithm = SunX509 > > metrics.sample.window.ms = 30000 > > fetch.min.bytes = 1 > > send.buffer.bytes = 131072 > > auto.offset.reset = earliest > > > > May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo > <init> > > INFO: Kafka version : 0.9.0.1 > > May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo > <init> > > INFO: Kafka commitId : 23c69d62a0cabf06 > > May 04, 2017 5:02:13 PM > org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource > generateInitialSplits > > INFO: Partitions assigned to split 0 (total 1): test-http-logs-json-0 > > May 04, 2017 5:02:13 PM org.apache.kafka.common.config.AbstractConfig > logAll > > INFO: ConsumerConfig values: > > metric.reporters = [] > > metadata.max.age.ms = 300000 > > value.deserializer = class org.apache.kafka.common.serialization. > ByteArrayDeserializer > > group.id = http-logs-beam-json > > partition.assignment.strategy = [org.apache.kafka.clients. > consumer.RangeAssignor] > > reconnect.backoff.ms = 50 > > sasl.kerberos.ticket.renew.window.factor = 0.8 > > max.partition.fetch.bytes = 1048576 > > bootstrap.servers = [datanode2-cm1.mis-cds.local:6667, > datanode3-cm1.mis-cds.local:6667, datanode6-cm1.mis-cds.local:6667] > > retry.backoff.ms = 100 > > sasl.kerberos.kinit.cmd = /usr/bin/kinit > > sasl.kerberos.service.name = null > > sasl.kerberos.ticket.renew.jitter = 0.05 > > ssl.keystore.type = JKS > > ssl.trustmanager.algorithm = PKIX > > enable.auto.commit = true > > ssl.key.password = null > > fetch.max.wait.ms = 500 > > sasl.kerberos.min.time.before.relogin = 60000 > > connections.max.idle.ms = 540000 > > ssl.truststore.password = null > > session.timeout.ms = 30000 > > metrics.num.samples = 2 > > client.id = > > ssl.endpoint.identification.algorithm = null > > key.deserializer = class org.apache.kafka.common.serialization. > ByteArrayDeserializer > > ssl.protocol = TLS > > check.crcs = true > > request.timeout.ms = 40000 > > ssl.provider = null > > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > > ssl.keystore.location = null > > heartbeat.interval.ms = 3000 > > auto.commit.interval.ms = 5000 > > receive.buffer.bytes = 1048576 > > ssl.cipher.suites = null > > ssl.truststore.type = JKS > > security.protocol = PLAINTEXT > > ssl.truststore.location = null > > ssl.keystore.password = null > > ssl.keymanager.algorithm = SunX509 > > metrics.sample.window.ms = 30000 > > fetch.min.bytes = 1 > > send.buffer.bytes = 131072 > > auto.offset.reset = earliest > > > > May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo > <init> > > INFO: Kafka version : 0.9.0.1 > > May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo > <init> > > INFO: Kafka commitId : 23c69d62a0cabf06 > > May 04, 2017 5:02:14 PM > org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader > start > > INFO: Reader-0: reading from test-http-logs-json-0 starting at offset 0 > > May 04, 2017 5:02:14 PM org.apache.kafka.common.config.AbstractConfig > logAll > > INFO: ConsumerConfig values: > > metric.reporters = [] > > metadata.max.age.ms = 300000 > > value.deserializer = class org.apache.kafka.common.serialization. > ByteArrayDeserializer > > group.id = Reader-0_offset_consumer_1029147362_http-logs-beam-json > > partition.assignment.strategy = [org.apache.kafka.clients. > consumer.RangeAssignor] > > reconnect.backoff.ms = 50 > > sasl.kerberos.ticket.renew.window.factor = 0.8 > > max.partition.fetch.bytes = 1048576 > > bootstrap.servers = [datanode2-cm1.mis-cds.local:6667, > datanode3-cm1.mis-cds.local:6667, datanode6-cm1.mis-cds.local:6667] > > retry.backoff.ms = 100 > > sasl.kerberos.kinit.cmd = /usr/bin/kinit > > sasl.kerberos.service.name = null > > sasl.kerberos.ticket.renew.jitter = 0.05 > > ssl.keystore.type = JKS > > ssl.trustmanager.algorithm = PKIX > > enable.auto.commit = false > > ssl.key.password = null > > fetch.max.wait.ms = 500 > > sasl.kerberos.min.time.before.relogin = 60000 > > connections.max.idle.ms = 540000 > > ssl.truststore.password = null > > session.timeout.ms = 30000 > > metrics.num.samples = 2 > > client.id = > > ssl.endpoint.identification.algorithm = null > > key.deserializer = class org.apache.kafka.common.serialization. > ByteArrayDeserializer > > ssl.protocol = TLS > > check.crcs = true > > request.timeout.ms = 40000 > > ssl.provider = null > > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > > ssl.keystore.location = null > > heartbeat.interval.ms = 3000 > > auto.commit.interval.ms = 5000 > > receive.buffer.bytes = 1048576 > > ssl.cipher.suites = null > > ssl.truststore.type = JKS > > security.protocol = PLAINTEXT > > ssl.truststore.location = null > > ssl.keystore.password = null > > ssl.keymanager.algorithm = SunX509 > > metrics.sample.window.ms = 30000 > > fetch.min.bytes = 1 > > send.buffer.bytes = 131072 > > auto.offset.reset = earliest > > > > May 04, 2017 5:02:14 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo > <init> > > INFO: Kafka version : 0.9.0.1 > > May 04, 2017 5:02:14 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo > <init> > > INFO: Kafka commitId : 23c69d62a0cabf06 > > > > > > Any suggestions as I have been on this now for a over a day with various > attempts but nothing comes through. > > When connecting to a different topic (which I subscribe directly via Flink > in another application and get data from), I can set the .put( > *"auto.offset.reset"*, (Object) * "earliest"*) to earliest and latest and > see different values for the offset so kafka appears to be available/ > visible etc. > > > > Many thanks > > Conrad > > > > SecureData, combating cyber threats > > > ------------------------------ > > The information contained in this message or any of its attachments may be > privileged and confidential and intended for the exclusive use of the > intended recipient. If you are not the intended recipient any disclosure, > reproduction, distribution or other dissemination or use of this > communications is strictly prohibited. The views expressed in this email > are those of the individual and not necessarily of SecureData Europe Ltd. > Any prices quoted are only valid if followed up by a formal written quote. > > SecureData Europe Limited. Registered in England & Wales 04365896. > Registered Address: SecureData House, Hermitage Court, Hermitage Lane, > Maidstone, Kent, ME16 9NT > > > > > -- > > ---- > > Mingmin > > > > ***This email originated outside SecureData*** > > Click here <https://www.mailcontrol.com/sr/MZbqvYs5QwJvpeaetUwhCQ==> to > report this email as spam. > > > > >