Thanks Raghu,

When I try to run it on flink using the incubator-beam code, i.e.

flink run -c 
com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample 
target/beam-1.0-SNAPSHOT.jar --bootstrapServers=cl-pu4p:9092 --topics=test_in 
--outputTopic=test_out

I get this:

org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error.
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
        at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
        at 
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
        at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
Caused by: java.lang.IllegalStateException: no evaluator registered for 
Read(UnboundedKafkaSource)
        at 
com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:852)
        at 
com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
        at 
com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
        at 
com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
        at 
com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
        at 
com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259)
        at 
com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814)
        at 
com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526)
        at 
com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96)
        at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
        at 
com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
        ... 6 more

Any ideas?

Bill

> On Mar 18, 2016, at 2:47 PM, Raghu Angadi <[email protected]> wrote:
> 
> Thanks for trying it. 
> 
> I fixed the CheckStyle error  (not sure why my build is not failing). Let me 
> know if you see any issues running with Beam. I haven't tried it. I should. 
> In fact Daniel Halperin says my patch should be against Beam..
> 
> Raghu.
> 
> On Fri, Mar 18, 2016 at 11:22 AM, William McCarthy <[email protected] 
> <mailto:[email protected]>> wrote:
> Thanks JB, Emanuele, Kostas & Raghu, especially Kostas and Raghu for pointing 
> me to working code.
> 
> I’m in the middle of a hack day at the moment, so the speed of your responses 
> has been very welcome.
> 
> In the first instance, I’ll try using your changes, Raghu. I’ve cloned your 
> repo, switched to the kafka branch and built both contrib/kafka and 
> contrib/examples/kafka. The contrib/kafka initially failed with a CheckStyle 
> error 
> (/Users/bill/dev/DataflowJavaSDK/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java:683:12:
>  'private' modifier out of order with the JLS suggestions)… I’ve fixed that 
> in my local clone and now it’s building fine. I hope to be able to run your 
> contrib unchanged on top of the incubator-beam codebase, which will be what I 
> attempt to do now.
> 
> Thanks again to all, for your swift help.
> 
> Bill
> 
>> On Mar 18, 2016, at 12:55 PM, Raghu Angadi <[email protected] 
>> <mailto:[email protected]>> wrote:
>> 
>> Hi Bill,
>> 
>> We have fairly well tested patch for KafkaIO (pr #121 
>> <https://github.com/GoogleCloudPlatform/DataflowJavaSDK/pull/121>). It will 
>> be merged soon. The example there keeps track of top hashtags in 10 minute 
>> sliding window and writes the results to another Kafka topic. Please try it 
>> if you can. It is well tested on Google Cloud Dataflow. I have not run it 
>> using Flink runner.
>> 
>> Raghu.
>> 
>> On Fri, Mar 18, 2016 at 9:46 AM, Kostas Kloudas <[email protected] 
>> <mailto:[email protected]>> wrote:
>> Hello Bill,
>> 
>> This is a known limitation of the Flink Runner. 
>> There is a JIRA issue for that 
>> https://issues.apache.org/jira/browse/BEAM-127 
>> <https://issues.apache.org/jira/browse/BEAM-127>
>> 
>> A wrapper for Flink sinks will come soon and as Beam evolves,
>> a more Beam-y solution will come as well.
>> 
>> Kostas
>>> On Mar 18, 2016, at 5:23 PM, William McCarthy <[email protected] 
>>> <mailto:[email protected]>> wrote:
>>> 
>>> Hi,
>>> 
>>> I’m trying to write a proof-of-concept which takes messages from Kafka, 
>>> transforms them using Beam on Flink, then pushes the results onto a 
>>> different Kafka topic.
>>> 
>>> I’ve used the KafkaWindowedWordCountExample as a starting point, and that’s 
>>> doing the first part of what I want to do, but it outputs to text files as 
>>> opposed to Kafka. FlinkKafkaProducer08 looks promising, but I can’t figure 
>>> out how to plug it into the pipeline. I was thinking that it would be 
>>> wrapped with an UnboundedFlinkSink, or some such, but that doesn’t seem to 
>>> exist.
>>> 
>>> Any advice or thoughts on what I’m trying to do?
>>> 
>>> I’m running the latest incubator-beam (as of last night from Github), Flink 
>>> 1.0.0 in cluster mode and Kafka 0.9.0.1, all on Google Compute Engine 
>>> (Debian Jessie).
>>> 
>>> Thanks,
>>> 
>>> Bill McCarthy
>> 
>> 
> 
> 

Reply via email to