Thanks for the confirmation Cornad. Please let us know how Beam works for
you once Kafka is upgraded to 0.9+.

I wish the Kafka 0.9 consumer failed more explicitly, rather than just
hanging.

Raghu.

On Wed, May 10, 2017 at 5:17 AM, Conrad Crampton <
conrad.cramp...@secdata.com> wrote:

> Hi Raghu,
>
> I would like to be running 0.9 Kafka servers, but I can confirm that I am
> running 0.8.2 – off the HDP 2.3.0 stack. The confusion lies (I think) with
> the upgrade to HDP has a bug in the way Ambari reports the various versions
> of the stack – specifically Kafka as 0.9.0.1, so I have been trying to use
> equivalent client version. It is all very odd in that I am using a Nifi
> processor (PublishKafka) which is a v0.9 producer which appears to create
> messages for the topic, but can only use a 0.8 consumer to read from it.
> Now that I have cleared up the fact that I am in fact saddled with a 0.8.2
> server I can move on an use the appropriate clients – alas however, not
> Beam for the moment (given it is 0.9+ compatible). I know we are 0.8.2 on
> the server as the version is within the jar names for the kafka jars i.e.
> kafka_2.10-0.8.2.2.3.0.0-2557.jar
>
> So until I can get my HDP upgraded to include a later version of Kafka I
> am going to have to use Flink (or Spark etc.) for my current application,
> but once upgraded I will definitely come back to Beam as it appears to be a
> great product with terrific community support.
>
> Thanks again,
>
> Conrad
>
>
>
> *From: *Raghu Angadi <rang...@google.com>
> *Reply-To: *"user@beam.apache.org" <user@beam.apache.org>
> *Date: *Wednesday, 10 May 2017 at 00:20
>
> *To: *"user@beam.apache.org" <user@beam.apache.org>
> *Subject: *Re: KafkaIO nothing received?
>
>
>
> Hi Conrad,
>
>
>
> Kafka consumer in Beam is 0.9 or above. Almost certainly you are running a
> 0.9 or newer servers. I don't think 0.9 new client can talk to old brokers
> (but 0.9 brokers can talk to older clients). How did you confirm the server
> version? You can check the server log. But I might be mistaken about this
> incompatibility.
>
>
>
> Can you post 'jstack' of the application when it is stuck (assuming you
> are running using DirectRunner)?
>
>
>
> > Kafka 0.8 requires a zookeeper connect property to be set, but I can’t
> set this using updateConsumerProperties as the value gets discarded.
>
>
>
> KafkaIO does not place any restrictions on ConsumerConfig except for key
> and value deserializers. The message about discarding these would be from
> Kafka consumer itself. I think it ignores configuration settings that it
> does not know about and logs them
> <https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L721>
> .
>
>
>
> Raghu.
>
>
>
> On Mon, May 8, 2017 at 1:57 AM, Conrad Crampton <
> conrad.cramp...@secdata.com> wrote:
>
> Hi Raghu,
>
> Yeah, the job just waits and does nothing. It reports the correct offset
> (this changes when I use ‘earliest’ or ‘latest’), but nothing is received.
> There are definitely messages in the queue. I am using Beam 0.6.
>
> With my other application using Flink, I am using the FlinkKafkaConsumer08
> libraries (and not the FlinkKafkaConsumer09)  as I am sure I had a similar
> problem then i.e. no errors reported and appears to work fine, but nothing
> actually received in the streaming job.
>
> Kafka 0.8 requires a zookeeper connect property to be set, but I can’t set
> this using updateConsumerProperties as the value gets discarded.
>
> Thanks
>
> Conrad
>
>
>
> *From: *Raghu Angadi <rang...@google.com>
> *Reply-To: *"user@beam.apache.org" <user@beam.apache.org>
> *Date: *Thursday, 4 May 2017 at 18:27
>
>
> *To: *"user@beam.apache.org" <user@beam.apache.org>
> *Subject: *Re: KafkaIO nothing received?
>
>
>
> Conrad,
>
>
>
> It does not look like there is a version incompatibility. You would see
> errors during during initialization otherwise. Log line "INFO: Reader-0:
> reading from test-http-logs-json-0 starting at offset 0" says it Kafka
> consumer was able to connect to servers.
>
>
>
> Does the the job just wait inside p.run()? What is the version of Beam you
> are using? If it is just waiting for records, please ensure the topic has
> messages (using kafka-console-consumer.sh etc). Alternately you can try
> reading from another topic you mentioned that worked fine.
>
>
>
> Raghu.
>
>
>
> On Thu, May 4, 2017 at 10:07 AM, Conrad Crampton <
> conrad.cramp...@secdata.com> wrote:
>
> Hi,
>
> Ok, good to know I’m not going totally mad.
>
> I think I may have been running around in circles unnecessarily <grrr>
>
> I am using kafka as part of an HDP installation (with Ambari). The Ambari
> interface is reporting my kafka version as 0.9.0.2.3 and indeed the output
> given previously give
>
> INFO: Kafka version : 0.9.0.1 (which doesn’t make particular sense). So I
> have ssh’d onto the server and looked at the libs for kafka and they are
> kafka_2.10-0.8.2.2.3.0.0-2557.jar so I’m guessing something is not quite
> right. This is incredibly frustrating as it looks like I am trying to
> connect to  v0.9 kafka but it’s actually v0.8 which clearly is very
> different wrt/ zookeeper etc. This is also backup up by trying the
> kafka-console-consumer.sh (and all the other tools) ask for mandatory
> zookeeper options which shouldn’t be necessary as far as I understand it
> for v0.9.
>
>
>
> I am currently looking at https://github.com/mxm/incubator-beam/blob/
> 63bce07d8c6cc5e610ad24e915e2585fef582567/runners/flink/
> examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/
> KafkaIOExamples.java to see if I can use this code somehow to use Beam
> with Kafka v0.8. I am really hoping to as I have no option to upgrade
> currently and I really like the abstraction of Beam.
>
>
>
> Thanks
>
> Conrad
>
>
>
>
>
> *From: *Mingmin Xu <mingm...@gmail.com>
> *Reply-To: *"user@beam.apache.org" <user@beam.apache.org>
> *Date: *Thursday, 4 May 2017 at 17:59
> *To: *"user@beam.apache.org" <user@beam.apache.org>
> *Subject: *Re: KafkaIO nothing received?
>
>
>
> @Conrad,
>
> Your code should be good to go, I can run it in my local env. There're two
> points you may have a check:
>
> 1). does the topic have data there, you can confirm with kafka cli '
> *bin/kafka-console-consumer.sh*';
>
> 2). is the port in bootstrapServers right? By default it's 9092.
>
>
>
>
>
> On Thu, May 4, 2017 at 9:05 AM, Conrad Crampton <
> conrad.cramp...@secdata.com> wrote:
>
> Hi,
>
> New to the group – ‘hello’!
>
>
>
> Just starting to look into Beam and I very much like the concepts, but
> have rather fallen at the first hurdle – that being trying to subscribe to
> a kafka topic and process results.
>
> Very simply the following code doesn’t get receive any records (the data
> is going into the queue) – I just get nothing.
>
> I have tried on both direct-runner and flink-runner (using the Quickstart
> as a base for options, mvn profile etc.)
>
>
>
> Code
>
>
>
> Pipeline p = Pipeline.*create*(options);
>
> List<String> topics = ImmutableList.*of*(*"test-http-logs-json"*);
>
>
> PCollection<String> logs = p.apply(KafkaIO.<String, String>*read*()
>         .withBootstrapServers(
> *"datanode2-cm1.mis-cds.local:6667,datanode3-cm1.mis-cds.local:6667,datanode6-cm1.mis-cds.local:6667"*
> )
>         .withTopics(topics)
>         .withKeyCoder(StringUtf8Coder.*of*())
>         .withValueCoder(StringUtf8Coder.*of*())
>         .withMaxNumRecords(10)
>         .updateConsumerProperties(ImmutableMap.<String, Object>*builder*()
>                 .put(*"auto.offset.reset"*, (Object) *"earliest"*)
>                 .put(*"group.id <http://group.id>"*, (Object)
> *"http-logs-beam-json"*)
>                 .put(*"enable.auto.commit"*, (Object) *"true"*)
>                 .put(*"receive.buffer.bytes"*, 1024 * 1024)
>                 .build())
>
> *// set a Coder for Key and Value         *.withoutMetadata())
>         .apply(*"Transform "*, MapElements.*via*(*new 
> *SimpleFunction<KV<String,
> String>, String>() {
>             @Override
>             *public *String apply(KV<String, String> input) {
>                 *log*.debug(*"{}"*, input.getValue());
>                 *return *input.getKey() + *" " *+ input.getValue();
>             }
>         }));
>
>
> p.run();
>
>
>
>
>
> Result:
>
> May 04, 2017 5:02:13 PM org.apache.kafka.common.config.AbstractConfig
> logAll
>
> INFO: ConsumerConfig values:
>
>         metric.reporters = []
>
>         metadata.max.age.ms = 300000
>
>         value.deserializer = class org.apache.kafka.common.serialization.
> ByteArrayDeserializer
>
>         group.id = http-logs-beam-json
>
>         partition.assignment.strategy = [org.apache.kafka.clients.
> consumer.RangeAssignor]
>
>         reconnect.backoff.ms = 50
>
>         sasl.kerberos.ticket.renew.window.factor = 0.8
>
>         max.partition.fetch.bytes = 1048576
>
>         bootstrap.servers = [datanode2-cm1.mis-cds.local:6667,
> datanode3-cm1.mis-cds.local:6667, datanode6-cm1.mis-cds.local:6667]
>
>         retry.backoff.ms = 100
>
>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>
>         sasl.kerberos.service.name = null
>
>         sasl.kerberos.ticket.renew.jitter = 0.05
>
>         ssl.keystore.type = JKS
>
>         ssl.trustmanager.algorithm = PKIX
>
>         enable.auto.commit = true
>
>         ssl.key.password = null
>
>         fetch.max.wait.ms = 500
>
>         sasl.kerberos.min.time.before.relogin = 60000
>
>         connections.max.idle.ms = 540000
>
>         ssl.truststore.password = null
>
>         session.timeout.ms = 30000
>
>         metrics.num.samples = 2
>
>         client.id =
>
>         ssl.endpoint.identification.algorithm = null
>
>         key.deserializer = class org.apache.kafka.common.serialization.
> ByteArrayDeserializer
>
>         ssl.protocol = TLS
>
>         check.crcs = true
>
>         request.timeout.ms = 40000
>
>        ssl.provider = null
>
>         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>
>         ssl.keystore.location = null
>
>         heartbeat.interval.ms = 3000
>
>         auto.commit.interval.ms = 5000
>
>         receive.buffer.bytes = 1048576
>
>         ssl.cipher.suites = null
>
>         ssl.truststore.type = JKS
>
>         security.protocol = PLAINTEXT
>
>         ssl.truststore.location = null
>
>         ssl.keystore.password = null
>
>         ssl.keymanager.algorithm = SunX509
>
>         metrics.sample.window.ms = 30000
>
>         fetch.min.bytes = 1
>
>         send.buffer.bytes = 131072
>
>         auto.offset.reset = earliest
>
>
>
> May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo
> <init>
>
> INFO: Kafka version : 0.9.0.1
>
> May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo
> <init>
>
> INFO: Kafka commitId : 23c69d62a0cabf06
>
> May 04, 2017 5:02:13 PM 
> org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource
> generateInitialSplits
>
> INFO: Partitions assigned to split 0 (total 1): test-http-logs-json-0
>
> May 04, 2017 5:02:13 PM org.apache.kafka.common.config.AbstractConfig
> logAll
>
> INFO: ConsumerConfig values:
>
>         metric.reporters = []
>
>         metadata.max.age.ms = 300000
>
>         value.deserializer = class org.apache.kafka.common.serialization.
> ByteArrayDeserializer
>
>         group.id = http-logs-beam-json
>
>         partition.assignment.strategy = [org.apache.kafka.clients.
> consumer.RangeAssignor]
>
>         reconnect.backoff.ms = 50
>
>         sasl.kerberos.ticket.renew.window.factor = 0.8
>
>         max.partition.fetch.bytes = 1048576
>
>         bootstrap.servers = [datanode2-cm1.mis-cds.local:6667,
> datanode3-cm1.mis-cds.local:6667, datanode6-cm1.mis-cds.local:6667]
>
>         retry.backoff.ms = 100
>
>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>
>         sasl.kerberos.service.name = null
>
>         sasl.kerberos.ticket.renew.jitter = 0.05
>
>         ssl.keystore.type = JKS
>
>         ssl.trustmanager.algorithm = PKIX
>
>         enable.auto.commit = true
>
>         ssl.key.password = null
>
>         fetch.max.wait.ms = 500
>
>         sasl.kerberos.min.time.before.relogin = 60000
>
>         connections.max.idle.ms = 540000
>
>         ssl.truststore.password = null
>
>         session.timeout.ms = 30000
>
>         metrics.num.samples = 2
>
>         client.id =
>
>         ssl.endpoint.identification.algorithm = null
>
>         key.deserializer = class org.apache.kafka.common.serialization.
> ByteArrayDeserializer
>
>         ssl.protocol = TLS
>
>         check.crcs = true
>
>         request.timeout.ms = 40000
>
>         ssl.provider = null
>
>         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>
>         ssl.keystore.location = null
>
>         heartbeat.interval.ms = 3000
>
>         auto.commit.interval.ms = 5000
>
>         receive.buffer.bytes = 1048576
>
>         ssl.cipher.suites = null
>
>         ssl.truststore.type = JKS
>
>         security.protocol = PLAINTEXT
>
>         ssl.truststore.location = null
>
>         ssl.keystore.password = null
>
>         ssl.keymanager.algorithm = SunX509
>
>         metrics.sample.window.ms = 30000
>
>         fetch.min.bytes = 1
>
>         send.buffer.bytes = 131072
>
>         auto.offset.reset = earliest
>
>
>
> May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo
> <init>
>
> INFO: Kafka version : 0.9.0.1
>
> May 04, 2017 5:02:13 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo
> <init>
>
> INFO: Kafka commitId : 23c69d62a0cabf06
>
> May 04, 2017 5:02:14 PM 
> org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader
> start
>
> INFO: Reader-0: reading from test-http-logs-json-0 starting at offset 0
>
> May 04, 2017 5:02:14 PM org.apache.kafka.common.config.AbstractConfig
> logAll
>
> INFO: ConsumerConfig values:
>
>         metric.reporters = []
>
>         metadata.max.age.ms = 300000
>
>         value.deserializer = class org.apache.kafka.common.serialization.
> ByteArrayDeserializer
>
>         group.id = Reader-0_offset_consumer_1029147362_http-logs-beam-json
>
>         partition.assignment.strategy = [org.apache.kafka.clients.
> consumer.RangeAssignor]
>
>         reconnect.backoff.ms = 50
>
>         sasl.kerberos.ticket.renew.window.factor = 0.8
>
>         max.partition.fetch.bytes = 1048576
>
>         bootstrap.servers = [datanode2-cm1.mis-cds.local:6667,
> datanode3-cm1.mis-cds.local:6667, datanode6-cm1.mis-cds.local:6667]
>
>         retry.backoff.ms = 100
>
>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>
>         sasl.kerberos.service.name = null
>
>         sasl.kerberos.ticket.renew.jitter = 0.05
>
>         ssl.keystore.type = JKS
>
>         ssl.trustmanager.algorithm = PKIX
>
>         enable.auto.commit = false
>
>         ssl.key.password = null
>
>         fetch.max.wait.ms = 500
>
>         sasl.kerberos.min.time.before.relogin = 60000
>
>         connections.max.idle.ms = 540000
>
>         ssl.truststore.password = null
>
>         session.timeout.ms = 30000
>
>         metrics.num.samples = 2
>
>         client.id =
>
>         ssl.endpoint.identification.algorithm = null
>
>         key.deserializer = class org.apache.kafka.common.serialization.
> ByteArrayDeserializer
>
>         ssl.protocol = TLS
>
>         check.crcs = true
>
>         request.timeout.ms = 40000
>
>         ssl.provider = null
>
>         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>
>         ssl.keystore.location = null
>
>         heartbeat.interval.ms = 3000
>
>         auto.commit.interval.ms = 5000
>
>         receive.buffer.bytes = 1048576
>
>         ssl.cipher.suites = null
>
>         ssl.truststore.type = JKS
>
>         security.protocol = PLAINTEXT
>
>         ssl.truststore.location = null
>
>         ssl.keystore.password = null
>
>         ssl.keymanager.algorithm = SunX509
>
>         metrics.sample.window.ms = 30000
>
>         fetch.min.bytes = 1
>
>         send.buffer.bytes = 131072
>
>         auto.offset.reset = earliest
>
>
>
> May 04, 2017 5:02:14 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo
> <init>
>
> INFO: Kafka version : 0.9.0.1
>
> May 04, 2017 5:02:14 PM org.apache.kafka.common.utils.AppInfoParser$AppInfo
> <init>
>
> INFO: Kafka commitId : 23c69d62a0cabf06
>
>
>
>
>
> Any suggestions as I have been on this now for a over a day with various
> attempts but nothing comes through.
>
> When connecting to a different topic (which I subscribe directly via Flink
> in another application and get data from), I can set the .put(
> *"auto.offset.reset"*, (Object) * "earliest"*) to earliest and latest and
> see different values for the offset so kafka appears to be available/
> visible etc.
>
>
>
> Many thanks
>
> Conrad
>
>
>
> SecureData, combating cyber threats
>
>
> ------------------------------
>
> The information contained in this message or any of its attachments may be
> privileged and confidential and intended for the exclusive use of the
> intended recipient. If you are not the intended recipient any disclosure,
> reproduction, distribution or other dissemination or use of this
> communications is strictly prohibited. The views expressed in this email
> are those of the individual and not necessarily of SecureData Europe Ltd.
> Any prices quoted are only valid if followed up by a formal written quote.
>
> SecureData Europe Limited. Registered in England & Wales 04365896.
> Registered Address: SecureData House, Hermitage Court, Hermitage Lane,
> Maidstone, Kent, ME16 9NT
>
>
>
>
> --
>
> ----
>
> Mingmin
>
>
>
> ***This email originated outside SecureData***
>
> Click here <https://www.mailcontrol.com/sr/MZbqvYs5QwJvpeaetUwhCQ==> to
> report this email as spam.
>
>
>
>
>

Reply via email to