Thanks Raghu,
When I try to run it on flink using the incubator-beam code, i.e.
flink run -c
com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
target/beam-1.0-SNAPSHOT.jar --bootstrapServers=cl-pu4p:9092 --topics=test_in
--outputTopic=test_out
I get this:
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
Caused by: java.lang.IllegalStateException: no evaluator registered for
Read(UnboundedKafkaSource)
at
com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:852)
at
com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
at
com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
at
com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
at
com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
at
com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259)
at
com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814)
at
com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526)
at
com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96)
at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
at
com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
... 6 more
Any ideas?
Bill
> On Mar 18, 2016, at 2:47 PM, Raghu Angadi <[email protected]> wrote:
>
> Thanks for trying it.
>
> I fixed the CheckStyle error (not sure why my build is not failing). Let me
> know if you see any issues running with Beam. I haven't tried it. I should.
> In fact Daniel Halperin says my patch should be against Beam..
>
> Raghu.
>
> On Fri, Mar 18, 2016 at 11:22 AM, William McCarthy <[email protected]
> <mailto:[email protected]>> wrote:
> Thanks JB, Emanuele, Kostas & Raghu, especially Kostas and Raghu for pointing
> me to working code.
>
> I’m in the middle of a hack day at the moment, so the speed of your responses
> has been very welcome.
>
> In the first instance, I’ll try using your changes, Raghu. I’ve cloned your
> repo, switched to the kafka branch and built both contrib/kafka and
> contrib/examples/kafka. The contrib/kafka initially failed with a CheckStyle
> error
> (/Users/bill/dev/DataflowJavaSDK/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java:683:12:
> 'private' modifier out of order with the JLS suggestions)… I’ve fixed that
> in my local clone and now it’s building fine. I hope to be able to run your
> contrib unchanged on top of the incubator-beam codebase, which will be what I
> attempt to do now.
>
> Thanks again to all, for your swift help.
>
> Bill
>
>> On Mar 18, 2016, at 12:55 PM, Raghu Angadi <[email protected]
>> <mailto:[email protected]>> wrote:
>>
>> Hi Bill,
>>
>> We have fairly well tested patch for KafkaIO (pr #121
>> <https://github.com/GoogleCloudPlatform/DataflowJavaSDK/pull/121>). It will
>> be merged soon. The example there keeps track of top hashtags in 10 minute
>> sliding window and writes the results to another Kafka topic. Please try it
>> if you can. It is well tested on Google Cloud Dataflow. I have not run it
>> using Flink runner.
>>
>> Raghu.
>>
>> On Fri, Mar 18, 2016 at 9:46 AM, Kostas Kloudas <[email protected]
>> <mailto:[email protected]>> wrote:
>> Hello Bill,
>>
>> This is a known limitation of the Flink Runner.
>> There is a JIRA issue for that
>> https://issues.apache.org/jira/browse/BEAM-127
>> <https://issues.apache.org/jira/browse/BEAM-127>
>>
>> A wrapper for Flink sinks will come soon and as Beam evolves,
>> a more Beam-y solution will come as well.
>>
>> Kostas
>>> On Mar 18, 2016, at 5:23 PM, William McCarthy <[email protected]
>>> <mailto:[email protected]>> wrote:
>>>
>>> Hi,
>>>
>>> I’m trying to write a proof-of-concept which takes messages from Kafka,
>>> transforms them using Beam on Flink, then pushes the results onto a
>>> different Kafka topic.
>>>
>>> I’ve used the KafkaWindowedWordCountExample as a starting point, and that’s
>>> doing the first part of what I want to do, but it outputs to text files as
>>> opposed to Kafka. FlinkKafkaProducer08 looks promising, but I can’t figure
>>> out how to plug it into the pipeline. I was thinking that it would be
>>> wrapped with an UnboundedFlinkSink, or some such, but that doesn’t seem to
>>> exist.
>>>
>>> Any advice or thoughts on what I’m trying to do?
>>>
>>> I’m running the latest incubator-beam (as of last night from Github), Flink
>>> 1.0.0 in cluster mode and Kafka 0.9.0.1, all on Google Compute Engine
>>> (Debian Jessie).
>>>
>>> Thanks,
>>>
>>> Bill McCarthy
>>
>>
>
>