Thanks Dan and Aljoscha,

I noticed the DirectPipelineRunner in the stack trace, and thought it was odd. 
I had a look in the code to see how I could change that, but came up empty: 
thanks for the README link.

…however, then Dan’s mail came in, which mentioned “sources”, and I changed my 
example to just use the KafkaWriter, without messing with the 
FlinkKafkaConsumer08 that I had previously been using. This proved fruitful, 
and now I have my example running with the FlinkKafkaConsumer08 as a source, 
and the  new KafkaWriter as a sink.

So, my example is now working for what I need. Thanks to you all for your help.

In the interests of closing the loop, I’ll switch the runner from the HashTag 
example, per the README, and see what I get.

Bill

> On Mar 18, 2016, at 4:21 PM, Aljoscha Krettek <[email protected]> wrote:
> 
> Hi,
> looks like the example is being executed with the DirectPipelineRunner which 
> does not seem to be able to cope with UnboundedSource. You need to set the 
> runner to the FlinkRunner in the example code as described here: 
> https://github.com/apache/incubator-beam/tree/master/runners/flink#executing-an-example
> 
> The Flink runner should be able to deal with UnboundedSource but has the 
> limitation that sources are always parallelism=1 (this is being worked on, 
> however).
> 
> Cheers,
> Aljoscha
>> On 18 Mar 2016, at 20:56, Dan Halperin <[email protected]> wrote:
>> 
>> Looks like the Flink runner may not yet support arbitrary code written with 
>> the UnboundedSource API. That is, it looks like the Flink runner expects the 
>> sources to get translated away.
>> 
>> Max?
>> 
>> Dan
>> 
>> On Fri, Mar 18, 2016 at 12:20 PM, William McCarthy 
>> <[email protected]> wrote:
>> Thanks Raghu,
>> 
>> When I try to run it on flink using the incubator-beam code, i.e.
>> 
>> flink run -c 
>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample 
>> target/beam-1.0-SNAPSHOT.jar --bootstrapServers=cl-pu4p:9092 
>> --topics=test_in --outputTopic=test_out
>> 
>> I get this:
>> 
>> org.apache.flink.client.program.ProgramInvocationException: The main method 
>> caused an error.
>>      at 
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>      at 
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>      at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>      at 
>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>      at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>      at 
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>      at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>> Caused by: java.lang.IllegalStateException: no evaluator registered for 
>> Read(UnboundedKafkaSource)
>>      at 
>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:852)
>>      at 
>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
>>      at 
>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>>      at 
>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>>      at 
>> com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
>>      at 
>> com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259)
>>      at 
>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814)
>>      at 
>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526)
>>      at 
>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96)
>>      at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
>>      at 
>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140)
>>      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>      at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>      at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>      at java.lang.reflect.Method.invoke(Method.java:498)
>>      at 
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>      ... 6 more
>> 
>> Any ideas?
>> 
>> Bill
>> 
>>> On Mar 18, 2016, at 2:47 PM, Raghu Angadi <[email protected]> wrote:
>>> 
>>> Thanks for trying it. 
>>> 
>>> I fixed the CheckStyle error  (not sure why my build is not failing). Let 
>>> me know if you see any issues running with Beam. I haven't tried it. I 
>>> should. In fact Daniel Halperin says my patch should be against Beam..
>>> 
>>> Raghu.
>>> 
>>> On Fri, Mar 18, 2016 at 11:22 AM, William McCarthy 
>>> <[email protected]> wrote:
>>> Thanks JB, Emanuele, Kostas & Raghu, especially Kostas and Raghu for 
>>> pointing me to working code.
>>> 
>>> I’m in the middle of a hack day at the moment, so the speed of your 
>>> responses has been very welcome.
>>> 
>>> In the first instance, I’ll try using your changes, Raghu. I’ve cloned your 
>>> repo, switched to the kafka branch and built both contrib/kafka and 
>>> contrib/examples/kafka. The contrib/kafka initially failed with a 
>>> CheckStyle error 
>>> (/Users/bill/dev/DataflowJavaSDK/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java:683:12:
>>>  'private' modifier out of order with the JLS suggestions)… I’ve fixed that 
>>> in my local clone and now it’s building fine. I hope to be able to run your 
>>> contrib unchanged on top of the incubator-beam codebase, which will be what 
>>> I attempt to do now.
>>> 
>>> Thanks again to all, for your swift help.
>>> 
>>> Bill
>>> 
>>>> On Mar 18, 2016, at 12:55 PM, Raghu Angadi <[email protected]> wrote:
>>>> 
>>>> Hi Bill,
>>>> 
>>>> We have fairly well tested patch for KafkaIO (pr #121). It will be merged 
>>>> soon. The example there keeps track of top hashtags in 10 minute sliding 
>>>> window and writes the results to another Kafka topic. Please try it if you 
>>>> can. It is well tested on Google Cloud Dataflow. I have not run it using 
>>>> Flink runner.
>>>> 
>>>> Raghu.
>>>> 
>>>> On Fri, Mar 18, 2016 at 9:46 AM, Kostas Kloudas 
>>>> <[email protected]> wrote:
>>>> Hello Bill,
>>>> 
>>>> This is a known limitation of the Flink Runner. 
>>>> There is a JIRA issue for that 
>>>> https://issues.apache.org/jira/browse/BEAM-127
>>>> 
>>>> A wrapper for Flink sinks will come soon and as Beam evolves,
>>>> a more Beam-y solution will come as well.
>>>> 
>>>> Kostas
>>>>> On Mar 18, 2016, at 5:23 PM, William McCarthy <[email protected]> 
>>>>> wrote:
>>>>> 
>>>>> Hi,
>>>>> 
>>>>> I’m trying to write a proof-of-concept which takes messages from Kafka, 
>>>>> transforms them using Beam on Flink, then pushes the results onto a 
>>>>> different Kafka topic.
>>>>> 
>>>>> I’ve used the KafkaWindowedWordCountExample as a starting point, and 
>>>>> that’s doing the first part of what I want to do, but it outputs to text 
>>>>> files as opposed to Kafka. FlinkKafkaProducer08 looks promising, but I 
>>>>> can’t figure out how to plug it into the pipeline. I was thinking that it 
>>>>> would be wrapped with an UnboundedFlinkSink, or some such, but that 
>>>>> doesn’t seem to exist.
>>>>> 
>>>>> Any advice or thoughts on what I’m trying to do?
>>>>> 
>>>>> I’m running the latest incubator-beam (as of last night from Github), 
>>>>> Flink 1.0.0 in cluster mode and Kafka 0.9.0.1, all on Google Compute 
>>>>> Engine (Debian Jessie).
>>>>> 
>>>>> Thanks,
>>>>> 
>>>>> Bill McCarthy
>>>> 
>>>> 
>>> 
>>> 
>> 
>> 
> 

Reply via email to