Thanks for the confirmation Cornad. Please let us know how Beam works for
you once Kafka is upgraded to 0.9+.

I wish the Kafka 0.9 consumer failed more explicitly, rather than just


> Hi Raghu,
> I would like to be running 0.9 Kafka servers, but I can confirm that I am
> running 0.8.2 – off the HDP 2.3.0 stack. The confusion lies (I think) with
> the upgrade to HDP has a bug in the way Ambari reports the various versions
> of the stack – specifically Kafka as, so I have been trying to use
> equivalent client version. It is all very odd in that I am using a Nifi
> processor (PublishKafka) which is a v0.9 producer which appears to create
> messages for the topic, but can only use a 0.8 consumer to read from it.
> Now that I have cleared up the fact that I am in fact saddled with a 0.8.2
> server I can move on an use the appropriate clients – alas however, not
> Beam for the moment (given it is 0.9+ compatible). I know we are 0.8.2 on
> the server as the version is within the jar names for the kafka jars i.e.
> kafka_2.10-
> So until I can get my HDP upgraded to include a later version of Kafka I
> am going to have to use Flink (or Spark etc.) for my current application,
> but once upgraded I will definitely come back to Beam as it appears to be a
> great product with terrific community support.
> Thanks again,
> Conrad
> Hi Conrad,
> Kafka consumer in Beam is 0.9 or above. Almost certainly you are running a
> 0.9 or newer servers. I don't think 0.9 new client can talk to old brokers
> (but 0.9 brokers can talk to older clients). How did you confirm the server
> version? You can check the server log. But I might be mistaken about this
> incompatibility.
> Can you post 'jstack' of the application when it is stuck (assuming you
> are running using DirectRunner)?
> > Kafka 0.8 requires a zookeeper connect property to be set, but I can’t
> set this using updateConsumerProperties as the value gets discarded.
> KafkaIO does not place any restrictions on ConsumerConfig except for key
> and value deserializers. The message about discarding these would be from
> Kafka consumer itself. I think it ignores configuration settings that it
> does not know about and logs them
> <>
> .
> Raghu.
> Hi Raghu,
> Yeah, the job just waits and does nothing. It reports the correct offset
> (this changes when I use ‘earliest’ or ‘latest’), but nothing is received.
> There are definitely messages in the queue. I am using Beam 0.6.
> With my other application using Flink, I am using the FlinkKafkaConsumer08
> libraries (and not the FlinkKafkaConsumer09)  as I am sure I had a similar
> problem then i.e. no errors reported and appears to work fine, but nothing
> actually received in the streaming job.
> Kafka 0.8 requires a zookeeper connect property to be set, but I can’t set
> this using updateConsumerProperties as the value gets discarded.
> Thanks
> Conrad
> Conrad,
> It does not look like there is a version incompatibility. You would see
> errors during during initialization otherwise. Log line "INFO: Reader-0:
> reading from test-http-logs-json-0 starting at offset 0" says it Kafka
> consumer was able to connect to servers.
> Does the the job just wait inside What is the version of Beam you
> are using? If it is just waiting for records, please ensure the topic has
> messages (using etc). Alternately you can try
> reading from another topic you mentioned that worked fine.
> Raghu.
> Hi,
> Ok, good to know I’m not going totally mad.
> I think I may have been running around in circles unnecessarily <grrr>
> I am using kafka as part of an HDP installation (with Ambari). The Ambari
> interface is reporting my kafka version as and indeed the output
> given previously give
> INFO: Kafka version : (which doesn’t make particular sense). So I
> have ssh’d onto the server and looked at the libs for kafka and they are
> kafka_2.10- so I’m guessing something is not quite
> right. This is incredibly frustrating as it looks like I am trying to
> connect to  v0.9 kafka but it’s actually v0.8 which clearly is very
> different wrt/ zookeeper etc. This is also backup up by trying the
> (and all the other tools) ask for mandatory
> zookeeper options which shouldn’t be necessary as far as I understand it
> for v0.9.
> I am currently looking at
> 63bce07d8c6cc5e610ad24e915e2585fef582567/runners/flink/
> examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/
> to see if I can use this code somehow to use Beam
> with Kafka v0.8. I am really hoping to as I have no option to upgrade
> currently and I really like the abstraction of Beam.
> Thanks
> Conrad
> @Conrad,
> Your code should be good to go, I can run it in my local env. There're two
> points you may have a check:
> 1). does the topic have data there, you can confirm with kafka cli '
> *bin/*';
> 2). is the port in bootstrapServers right? By default it's 9092.
> Hi,
> New to the group – ‘hello’!
> Just starting to look into Beam and I very much like the concepts, but
> have rather fallen at the first hurdle – that being trying to subscribe to
> a kafka topic and process results.
> Very simply the following code doesn’t get receive any records (the data
> is going into the queue) – I just get nothing.
> I have tried on both direct-runner and flink-runner (using the Quickstart
> as a base for options, mvn profile etc.)
> Code
> Pipeline p = Pipeline.*create*(options);
> List<String> topics = ImmutableList.*of*(*"test-http-logs-json"*);
> PCollection<String> logs = p.apply(KafkaIO.<String, String>*read*()
>         .withBootstrapServers(
> *"datanode2-cm1.mis-cds.local:6667,datanode3-cm1.mis-cds.local:6667,datanode6-cm1.mis-cds.local:6667"*
> )
>         .withTopics(topics)
>         .withKeyCoder(StringUtf8Coder.*of*())
>         .withValueCoder(StringUtf8Coder.*of*())
>         .withMaxNumRecords(10)
>         .updateConsumerProperties(ImmutableMap.<String, Object>*builder*()
>                 .put(*"auto.offset.reset"*, (Object) *"earliest"*)
>                 .put(*" <>"*, (Object)
> *"http-logs-beam-json"*)
>                 .put(*""*, (Object) *"true"*)
>                 .put(*"receive.buffer.bytes"*, 1024 * 1024)
>                 .build())
> *// set a Coder for Key and Value         *.withoutMetadata())
>         .apply(*"Transform "*, MapElements.*via*(*new 
> *SimpleFunction<KV<String,
> String>, String>() {
>             @Override
>             *public *String apply(KV<String, String> input) {
>                 *log*.debug(*"{}"*, input.getValue());
>                 *return *input.getKey() + *" " *+ input.getValue();
>             }
>         }));
> Result:
> Any suggestions as I have been on this now for a over a day with various
> attempts but nothing comes through.
> When connecting to a different topic (which I subscribe directly via Flink
> in another application and get data from), I can set the .put(
> *"auto.offset.reset"*, (Object) * "earliest"*) to earliest and latest and
> see different values for the offset so kafka appears to be available/
> visible etc.
> Many thanks
> Conrad
