The pull request is here: https://github.com/apache/incubator-beam/pull/69

I'll merge it after the tests have passed and it has been reviewed.

On Wed, Mar 23, 2016 at 3:12 PM, Maximilian Michels <[email protected]> wrote:
> Hi William,
>
> I started working on a fix and better testing of the
> UnboundedSourceWrapper. I'll get back to you shortly with a pull
> request that we should be able to merge soon.
>
> - Max
>
> On Wed, Mar 23, 2016 at 1:33 PM, Aljoscha Krettek <[email protected]> wrote:
>> Hi,
>> as far as I can see from a quick glance the problem is that 
>> UnboundedSourceWrapper stores an instance of Reader that it gets from the 
>> Source. The Reader is not Serializable while the UnboundedSource is. I think 
>> the Reader should be initialized when actually running the source, in the 
>> run() method.
>>
>> Cheers,
>> Aljoscha
>>> On 23 Mar 2016, at 13:07, William McCarthy <[email protected]> 
>>> wrote:
>>>
>>> The first stack trace was from the latest runner, yes. I’ve pulled the very 
>>> latest, just now, and still get the same thing.
>>>
>>> When I write ‘pulled the very latest’, here’s what I mean (all of the 
>>> following commands, except the latest, finished with success):
>>>
>>> $ cd incubator-beam
>>> $ git pull
>>> …some output, then success
>>> $ git branch
>>> * master
>>> $ mvn -DskipTests clean install
>>> …some output, then success
>>> $ cd <my project>
>>> $ mvn -DskipTests clean install
>>> …some output, then success
>>> $ <command as output below>
>>>
>>> Bill
>>>
>>>> On Mar 22, 2016, at 5:29 AM, Maximilian Michels <[email protected]> wrote:
>>>>
>>>> Hi William,
>>>>
>>>> Is the first stack trace from the latest master? Using only the simple
>>>> class name of the Runner should actually work now. I've also added a
>>>> test to explicitly test that.
>>>>
>>>> The latter error, as Aljoscha pointed out, this due to batch
>>>> execution. Perhaps we could make it explicit during startup which mode
>>>> we're executing in. File a JIRA issue for that:
>>>> https://issues.apache.org/jira/browse/BEAM-139
>>>>
>>>> - Max
>>>>
>>>> On Tue, Mar 22, 2016 at 10:00 AM, Aljoscha Krettek <[email protected]> 
>>>> wrote:
>>>>> Hi,
>>>>> for Flink the runner can internally either translate to a batch job or a 
>>>>> streaming job. Unbounded sources are not supported when running in batch 
>>>>> mode so you have to somehow specify that you want to have streaming mode. 
>>>>> StreamingOptions has method setStreaming, maybe you can specify 
>>>>> “—streaming true” on the command line to set that flag.
>>>>>
>>>>> Cheers,
>>>>> Aljoscha
>>>>>> On 21 Mar 2016, at 23:39, William McCarthy <[email protected]> 
>>>>>> wrote:
>>>>>>
>>>>>> I’ve attempted to run the TopHashtagsExample again, using both 
>>>>>> ‘FlinkPipelineRunner’ and it's fully qualified name. Both bomb out, 
>>>>>> though the latter gets further. I hope this helps, here is the output of 
>>>>>> both:
>>>>>>
>>>>>> $ flink run -c 
>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample 
>>>>>> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner 
>>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out
>>>>>>
>>>>>> ------------------------------------------------------------
>>>>>> The program finished with the following exception:
>>>>>>
>>>>>> org.apache.flink.client.program.ProgramInvocationException: The main 
>>>>>> method caused an error.
>>>>>>     at 
>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>>>>>     at 
>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>>>     at 
>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>>>     at 
>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>>>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>>     at 
>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>>>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' 
>>>>>> specified 'FlinkPipelineRunner', supported pipeline runners 
>>>>>> [BlockingDataflowPipelineRunner, DataflowPipelineRunner, 
>>>>>> DirectPipelineRunner]
>>>>>>     at 
>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1486)
>>>>>>     at 
>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:101)
>>>>>>     at 
>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:286)
>>>>>>     at 
>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
>>>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>     at 
>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>     at 
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>     at 
>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>>>     ... 6 more
>>>>>> Caused by: java.lang.ClassNotFoundException: FlinkPipelineRunner
>>>>>>     at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>>>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>>>     at java.lang.Class.forName0(Native Method)
>>>>>>     at java.lang.Class.forName(Class.java:264)
>>>>>>     at 
>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1473)
>>>>>>     ... 14 more
>>>>>>
>>>>>>
>>>>>> $ flink run -c 
>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample 
>>>>>> target/beam-1.0-SNAPSHOT.jar 
>>>>>> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner 
>>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out
>>>>>>
>>>>>> ------------------------------------------------------------
>>>>>> The program finished with the following exception:
>>>>>>
>>>>>> org.apache.flink.client.program.ProgramInvocationException: The main 
>>>>>> method caused an error.
>>>>>>     at 
>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>>>>>     at 
>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>>>     at 
>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>>>     at 
>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>>>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>>     at 
>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>>>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>>> Caused by: java.lang.UnsupportedOperationException: The transform 
>>>>>> Read(UnboundedKafkaSource) is currently not supported.
>>>>>>     at 
>>>>>> org.apache.beam.runners.flink.translation.FlinkBatchPipelineTranslator.visitTransform(FlinkBatchPipelineTranslator.java:111)
>>>>>>     at 
>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
>>>>>>     at 
>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>>>>>>     at 
>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>>>>>>     at 
>>>>>> com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
>>>>>>     at 
>>>>>> com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259)
>>>>>>     at 
>>>>>> org.apache.beam.runners.flink.translation.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:34)
>>>>>>     at 
>>>>>> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:130)
>>>>>>     at 
>>>>>> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:109)
>>>>>>     at 
>>>>>> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:50)
>>>>>>     at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
>>>>>>     at 
>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140)
>>>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>     at 
>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>     at 
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>     at 
>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>>>     ... 6 more
>>>>>>
>>>>>>
>>>>>>> On Mar 21, 2016, at 12:23 PM, Raghu Angadi <[email protected]> wrote:
>>>>>>>
>>>>>>> Thanks Max.
>>>>>>>
>>>>>>> Bill McCarthy,
>>>>>>> I know you are unblocked and KafkaWriter is good enough. Please try 
>>>>>>> KafkaIO source from my branch with Flink runner if you get a chance.
>>>>>>>
>>>>>>> thanks,
>>>>>>> Raghu.
>>>>>>>
>>>>>>> On Mon, Mar 21, 2016 at 6:54 AM, Jean-Baptiste Onofré 
>>>>>>> <[email protected]> wrote:
>>>>>>> Thanks for the update Max !
>>>>>>>
>>>>>>> Regards
>>>>>>> JB
>>>>>>>
>>>>>>>
>>>>>>> On 03/21/2016 02:39 PM, Maximilian Michels wrote:
>>>>>>> FYI: The Runner registration has been fixed. The Flink runner
>>>>>>> explicitly registers as of [1]. Also, the SDK tries to look up the
>>>>>>> PipelineRunner class in case it has not been registered [2].
>>>>>>>
>>>>>>> [1] https://github.com/apache/incubator-beam/pull/40
>>>>>>> [2] https://github.com/apache/incubator-beam/pull/61
>>>>>>>
>>>>>>> On Sat, Mar 19, 2016 at 6:43 PM, Maximilian Michels <[email protected]> 
>>>>>>> wrote:
>>>>>>> Great to see such a lively discussion here.
>>>>>>>
>>>>>>> I think we'll support sinks through the Write interface (like in
>>>>>>> batched execution) and also have a dedicated wrapper for the Flink
>>>>>>> sinks. This is a very pressing but easy to solve issue of the Flink
>>>>>>> runner. Expect it to be in next week.
>>>>>>>
>>>>>>> Also, the proper registration of the runner is about to to be merged.
>>>>>>> We just need an ok from the contributor to merge the changes.
>>>>>>>
>>>>>>> Best,
>>>>>>> Max
>>>>>>>
>>>>>>> On Sat, Mar 19, 2016 at 12:42 AM, Dan Halperin <[email protected]> 
>>>>>>> wrote:
>>>>>>> Thanks Bill!
>>>>>>>
>>>>>>> Filed https://issues.apache.org/jira/browse/BEAM-136, but I'm glad it's 
>>>>>>> not
>>>>>>> blocking you!
>>>>>>>
>>>>>>> On Fri, Mar 18, 2016 at 4:04 PM, William McCarthy
>>>>>>> <[email protected]> wrote:
>>>>>>>
>>>>>>> I tried that, but still no dice: Just to be clear, it’s not a blocker 
>>>>>>> for
>>>>>>> me, given that I have my example running, but for your information the
>>>>>>> exception is below.
>>>>>>>
>>>>>>> I’ll watch the commit log on the beam incubator and look forward to
>>>>>>> deleting my copy of Raghu’s contributions when they’re merger to master.
>>>>>>>
>>>>>>> Thanks again for everyone’s help,
>>>>>>>
>>>>>>> Bill
>>>>>>>
>>>>>>>
>>>>>>> Command followed by exception:
>>>>>>>
>>>>>>> $ flink run -c
>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>>>>>>> target/beam-1.0-SNAPSHOT.jar
>>>>>>> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner
>>>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out
>>>>>>>
>>>>>>> ------------------------------------------------------------
>>>>>>> The program finished with the following exception:
>>>>>>>
>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>>>>>> method caused an error.
>>>>>>> at
>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>>>>>> at
>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>>>> at
>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>>> at
>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' 
>>>>>>> specified
>>>>>>> 'org.apache.beam.runners.flink.FlinkPipelineRunner', supported pipeline
>>>>>>> runners [BlockingDataflowPipelineRunner, DataflowPipelineRunner,
>>>>>>> DirectPipelineRunner]
>>>>>>> at
>>>>>>> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146)
>>>>>>> at
>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445)
>>>>>>> at
>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99)
>>>>>>> at
>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284)
>>>>>>> at
>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>> at
>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>> at
>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>> at
>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>>>> ... 6 more
>>>>>>>
>>>>>>> On Mar 18, 2016, at 5:35 PM, Thomas Groh <[email protected]> wrote:
>>>>>>>
>>>>>>> I don't believe the FlinkPipelineRunner is registered the same way the
>>>>>>> Dataflow & Direct Pipeline runners are registered; using
>>>>>>> org.apache.beam.runners.flink.FlinkPipelineRunner should work
>>>>>>>
>>>>>>> On Fri, Mar 18, 2016 at 2:10 PM, William McCarthy
>>>>>>> <[email protected]> wrote:
>>>>>>>
>>>>>>> Thanks Dan,
>>>>>>>
>>>>>>> I tried that, but getting the below. Note that the jar contains the
>>>>>>> FlinkPipelineRunner.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> % jar -tf target/beam-1.0-SNAPSHOT.jar | grep FlinkPipeline
>>>>>>> org/apache/beam/runners/flink/FlinkPipelineRunner.class
>>>>>>> org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.class
>>>>>>> org/apache/beam/runners/flink/FlinkPipelineOptions.class
>>>>>>> org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.class
>>>>>>>
>>>>>>> % flink run -c
>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>>>>>>> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner
>>>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out
>>>>>>>
>>>>>>> ------------------------------------------------------------
>>>>>>> The program finished with the following exception:
>>>>>>>
>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>>>>>> method caused an error.
>>>>>>> at
>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>>>>>> at
>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>>>> at
>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>>> at
>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' 
>>>>>>> specified
>>>>>>> 'FlinkPipelineRunner', supported pipeline runners
>>>>>>> [BlockingDataflowPipelineRunner, DataflowPipelineRunner,
>>>>>>> DirectPipelineRunner]
>>>>>>> at
>>>>>>> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146)
>>>>>>> at
>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445)
>>>>>>> at
>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99)
>>>>>>> at
>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284)
>>>>>>> at
>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>> at
>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>> at
>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>> at
>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>>>> ... 6 more
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mar 18, 2016, at 5:00 PM, Dan Halperin <[email protected]> wrote:
>>>>>>>
>>>>>>> Thanks for catching that, Aljoscha!
>>>>>>>
>>>>>>> Note that the Flink runner should be available via a command-line option
>>>>>>> as well: --runner=FlinkPipelineRunner.
>>>>>>>
>>>>>>> The list of valid values for that flag is computed by walking the
>>>>>>> classpath at runtime, so as long as the Flink jar is present it'll work.
>>>>>>>
>>>>>>> On Fri, Mar 18, 2016 at 1:21 PM, Aljoscha Krettek <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hi,
>>>>>>> looks like the example is being executed with the DirectPipelineRunner
>>>>>>> which does not seem to be able to cope with UnboundedSource. You need 
>>>>>>> to set
>>>>>>> the runner to the FlinkRunner in the example code as described here:
>>>>>>> https://github.com/apache/incubator-beam/tree/master/runners/flink#executing-an-example
>>>>>>>
>>>>>>> The Flink runner should be able to deal with UnboundedSource but has the
>>>>>>> limitation that sources are always parallelism=1 (this is being worked 
>>>>>>> on,
>>>>>>> however).
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Aljoscha
>>>>>>> On 18 Mar 2016, at 20:56, Dan Halperin <[email protected]> wrote:
>>>>>>>
>>>>>>> Looks like the Flink runner may not yet support arbitrary code written
>>>>>>> with the UnboundedSource API. That is, it looks like the Flink runner
>>>>>>> expects the sources to get translated away.
>>>>>>>
>>>>>>> Max?
>>>>>>>
>>>>>>> Dan
>>>>>>>
>>>>>>> On Fri, Mar 18, 2016 at 12:20 PM, William McCarthy
>>>>>>> <[email protected]> wrote:
>>>>>>> Thanks Raghu,
>>>>>>>
>>>>>>> When I try to run it on flink using the incubator-beam code, i.e.
>>>>>>>
>>>>>>> flink run -c
>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>>>>>>> target/beam-1.0-SNAPSHOT.jar --bootstrapServers=cl-pu4p:9092
>>>>>>> --topics=test_in --outputTopic=test_out
>>>>>>>
>>>>>>> I get this:
>>>>>>>
>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>>>>>> method caused an error.
>>>>>>>      at
>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>>>>>>      at
>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>>>>      at
>>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>>>>      at
>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>>>>      at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>>>      at
>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>>>>      at
>>>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>>>> Caused by: java.lang.IllegalStateException: no evaluator registered
>>>>>>> for Read(UnboundedKafkaSource)
>>>>>>>      at
>>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:852)
>>>>>>>      at
>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
>>>>>>>      at
>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>>>>>>>      at
>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>>>>>>>      at
>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
>>>>>>>      at
>>>>>>> com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259)
>>>>>>>      at
>>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814)
>>>>>>>      at
>>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526)
>>>>>>>      at
>>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96)
>>>>>>>      at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
>>>>>>>      at
>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140)
>>>>>>>      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>>      at
>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>>      at
>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>>      at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>      at
>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>>>>      ... 6 more
>>>>>>>
>>>>>>> Any ideas?
>>>>>>>
>>>>>>> Bill
>>>>>>>
>>>>>>> On Mar 18, 2016, at 2:47 PM, Raghu Angadi <[email protected]> wrote:
>>>>>>>
>>>>>>> Thanks for trying it.
>>>>>>>
>>>>>>> I fixed the CheckStyle error  (not sure why my build is not failing).
>>>>>>> Let me know if you see any issues running with Beam. I haven't tried 
>>>>>>> it. I
>>>>>>> should. In fact Daniel Halperin says my patch should be against Beam..
>>>>>>>
>>>>>>> Raghu.
>>>>>>>
>>>>>>> On Fri, Mar 18, 2016 at 11:22 AM, William McCarthy
>>>>>>> <[email protected]> wrote:
>>>>>>> Thanks JB, Emanuele, Kostas & Raghu, especially Kostas and Raghu for
>>>>>>> pointing me to working code.
>>>>>>>
>>>>>>> I’m in the middle of a hack day at the moment, so the speed of your
>>>>>>> responses has been very welcome.
>>>>>>>
>>>>>>> In the first instance, I’ll try using your changes, Raghu. I’ve
>>>>>>> cloned your repo, switched to the kafka branch and built both 
>>>>>>> contrib/kafka
>>>>>>> and contrib/examples/kafka. The contrib/kafka initially failed with a
>>>>>>> CheckStyle error
>>>>>>> (/Users/bill/dev/DataflowJavaSDK/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java:683:12:
>>>>>>> 'private' modifier out of order with the JLS suggestions)… I’ve fixed 
>>>>>>> that
>>>>>>> in my local clone and now it’s building fine. I hope to be able to run 
>>>>>>> your
>>>>>>> contrib unchanged on top of the incubator-beam codebase, which will be 
>>>>>>> what
>>>>>>> I attempt to do now.
>>>>>>>
>>>>>>> Thanks again to all, for your swift help.
>>>>>>>
>>>>>>> Bill
>>>>>>>
>>>>>>> On Mar 18, 2016, at 12:55 PM, Raghu Angadi <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hi Bill,
>>>>>>>
>>>>>>> We have fairly well tested patch for KafkaIO (pr #121). It will be
>>>>>>> merged soon. The example there keeps track of top hashtags in 10 minute
>>>>>>> sliding window and writes the results to another Kafka topic. Please 
>>>>>>> try it
>>>>>>> if you can. It is well tested on Google Cloud Dataflow. I have not run 
>>>>>>> it
>>>>>>> using Flink runner.
>>>>>>>
>>>>>>> Raghu.
>>>>>>>
>>>>>>> On Fri, Mar 18, 2016 at 9:46 AM, Kostas Kloudas
>>>>>>> <[email protected]> wrote:
>>>>>>> Hello Bill,
>>>>>>>
>>>>>>> This is a known limitation of the Flink Runner.
>>>>>>> There is a JIRA issue for that
>>>>>>> https://issues.apache.org/jira/browse/BEAM-127
>>>>>>>
>>>>>>> A wrapper for Flink sinks will come soon and as Beam evolves,
>>>>>>> a more Beam-y solution will come as well.
>>>>>>>
>>>>>>> Kostas
>>>>>>> On Mar 18, 2016, at 5:23 PM, William McCarthy
>>>>>>> <[email protected]> wrote:
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I’m trying to write a proof-of-concept which takes messages from
>>>>>>> Kafka, transforms them using Beam on Flink, then pushes the results 
>>>>>>> onto a
>>>>>>> different Kafka topic.
>>>>>>>
>>>>>>> I’ve used the KafkaWindowedWordCountExample as a starting point,
>>>>>>> and that’s doing the first part of what I want to do, but it outputs to 
>>>>>>> text
>>>>>>> files as opposed to Kafka. FlinkKafkaProducer08 looks promising, but I 
>>>>>>> can’t
>>>>>>> figure out how to plug it into the pipeline. I was thinking that it 
>>>>>>> would be
>>>>>>> wrapped with an UnboundedFlinkSink, or some such, but that doesn’t seem 
>>>>>>> to
>>>>>>> exist.
>>>>>>>
>>>>>>> Any advice or thoughts on what I’m trying to do?
>>>>>>>
>>>>>>> I’m running the latest incubator-beam (as of last night from
>>>>>>> Github), Flink 1.0.0 in cluster mode and Kafka 0.9.0.1, all on Google
>>>>>>> Compute Engine (Debian Jessie).
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Bill McCarthy
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Jean-Baptiste Onofré
>>>>>>> [email protected]
>>>>>>> http://blog.nanthrax.net
>>>>>>> Talend - http://www.talend.com
>>>>>>>
>>>>>>
>>>>>
>>>
>>

Reply via email to