Thanks Max,

This command now works, thanks:

$ flink run -c 
com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample 
target/beam-1.0-SNAPSHOT.jar 
--runner=org.apache.beam.runners.flink.FlinkPipelineRunner 
--bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out 
--streaming=true

Note that I’m still unable to use --runner=FlinkPipelineRunner (i.e. without 
the package scope). That behaviour is unexpected to you, given the test that 
you put in a few days ago, right? i.e.

$ flink run -c 
com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample 
target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner 
--bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error.
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
        at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
        at 
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
        at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
Caused by: java.lang.IllegalArgumentException: Unknown 'runner' specified 
'FlinkPipelineRunner', supported pipeline runners 
[BlockingDataflowPipelineRunner, DataflowPipelineRunner, DirectPipelineRunner]
        at 
com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1486)
        at 
com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:101)
        at 
com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:286)
        at 
com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
        ... 6 more
Caused by: java.lang.ClassNotFoundException: FlinkPipelineRunner
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:264)
        at 
com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1473)
        ... 14 more


> On Mar 23, 2016, at 2:42 PM, Maximilian Michels <[email protected]> wrote:
> 
> Hi William,
> 
> It has been merged. Feel free to try again.
> 
> Cheers,
> Max
> 
> On Wed, Mar 23, 2016 at 5:09 PM, Maximilian Michels <[email protected]> wrote:
>> The pull request is here: https://github.com/apache/incubator-beam/pull/69
>> 
>> I'll merge it after the tests have passed and it has been reviewed.
>> 
>> On Wed, Mar 23, 2016 at 3:12 PM, Maximilian Michels <[email protected]> wrote:
>>> Hi William,
>>> 
>>> I started working on a fix and better testing of the
>>> UnboundedSourceWrapper. I'll get back to you shortly with a pull
>>> request that we should be able to merge soon.
>>> 
>>> - Max
>>> 
>>> On Wed, Mar 23, 2016 at 1:33 PM, Aljoscha Krettek <[email protected]> 
>>> wrote:
>>>> Hi,
>>>> as far as I can see from a quick glance the problem is that 
>>>> UnboundedSourceWrapper stores an instance of Reader that it gets from the 
>>>> Source. The Reader is not Serializable while the UnboundedSource is. I 
>>>> think the Reader should be initialized when actually running the source, 
>>>> in the run() method.
>>>> 
>>>> Cheers,
>>>> Aljoscha
>>>>> On 23 Mar 2016, at 13:07, William McCarthy <[email protected]> 
>>>>> wrote:
>>>>> 
>>>>> The first stack trace was from the latest runner, yes. I’ve pulled the 
>>>>> very latest, just now, and still get the same thing.
>>>>> 
>>>>> When I write ‘pulled the very latest’, here’s what I mean (all of the 
>>>>> following commands, except the latest, finished with success):
>>>>> 
>>>>> $ cd incubator-beam
>>>>> $ git pull
>>>>> …some output, then success
>>>>> $ git branch
>>>>> * master
>>>>> $ mvn -DskipTests clean install
>>>>> …some output, then success
>>>>> $ cd <my project>
>>>>> $ mvn -DskipTests clean install
>>>>> …some output, then success
>>>>> $ <command as output below>
>>>>> 
>>>>> Bill
>>>>> 
>>>>>> On Mar 22, 2016, at 5:29 AM, Maximilian Michels <[email protected]> wrote:
>>>>>> 
>>>>>> Hi William,
>>>>>> 
>>>>>> Is the first stack trace from the latest master? Using only the simple
>>>>>> class name of the Runner should actually work now. I've also added a
>>>>>> test to explicitly test that.
>>>>>> 
>>>>>> The latter error, as Aljoscha pointed out, this due to batch
>>>>>> execution. Perhaps we could make it explicit during startup which mode
>>>>>> we're executing in. File a JIRA issue for that:
>>>>>> https://issues.apache.org/jira/browse/BEAM-139
>>>>>> 
>>>>>> - Max
>>>>>> 
>>>>>> On Tue, Mar 22, 2016 at 10:00 AM, Aljoscha Krettek <[email protected]> 
>>>>>> wrote:
>>>>>>> Hi,
>>>>>>> for Flink the runner can internally either translate to a batch job or 
>>>>>>> a streaming job. Unbounded sources are not supported when running in 
>>>>>>> batch mode so you have to somehow specify that you want to have 
>>>>>>> streaming mode. StreamingOptions has method setStreaming, maybe you can 
>>>>>>> specify “—streaming true” on the command line to set that flag.
>>>>>>> 
>>>>>>> Cheers,
>>>>>>> Aljoscha
>>>>>>>> On 21 Mar 2016, at 23:39, William McCarthy <[email protected]> 
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>> I’ve attempted to run the TopHashtagsExample again, using both 
>>>>>>>> ‘FlinkPipelineRunner’ and it's fully qualified name. Both bomb out, 
>>>>>>>> though the latter gets further. I hope this helps, here is the output 
>>>>>>>> of both:
>>>>>>>> 
>>>>>>>> $ flink run -c 
>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample 
>>>>>>>> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner 
>>>>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out
>>>>>>>> 
>>>>>>>> ------------------------------------------------------------
>>>>>>>> The program finished with the following exception:
>>>>>>>> 
>>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The main 
>>>>>>>> method caused an error.
>>>>>>>>    at 
>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>>>>>>>    at 
>>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>>>>>    at 
>>>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>>>>>    at 
>>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>>>>>    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>>>>    at 
>>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>>>>>    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' 
>>>>>>>> specified 'FlinkPipelineRunner', supported pipeline runners 
>>>>>>>> [BlockingDataflowPipelineRunner, DataflowPipelineRunner, 
>>>>>>>> DirectPipelineRunner]
>>>>>>>>    at 
>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1486)
>>>>>>>>    at 
>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:101)
>>>>>>>>    at 
>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:286)
>>>>>>>>    at 
>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
>>>>>>>>    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>>>    at 
>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>>>    at 
>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>>>    at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>>    at 
>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>>>>>    ... 6 more
>>>>>>>> Caused by: java.lang.ClassNotFoundException: FlinkPipelineRunner
>>>>>>>>    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>>>>>>>    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>>>>>    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>>>>>    at java.lang.Class.forName0(Native Method)
>>>>>>>>    at java.lang.Class.forName(Class.java:264)
>>>>>>>>    at 
>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1473)
>>>>>>>>    ... 14 more
>>>>>>>> 
>>>>>>>> 
>>>>>>>> $ flink run -c 
>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample 
>>>>>>>> target/beam-1.0-SNAPSHOT.jar 
>>>>>>>> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner 
>>>>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out
>>>>>>>> 
>>>>>>>> ------------------------------------------------------------
>>>>>>>> The program finished with the following exception:
>>>>>>>> 
>>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The main 
>>>>>>>> method caused an error.
>>>>>>>>    at 
>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>>>>>>>    at 
>>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>>>>>    at 
>>>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>>>>>    at 
>>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>>>>>    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>>>>    at 
>>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>>>>>    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>>>>> Caused by: java.lang.UnsupportedOperationException: The transform 
>>>>>>>> Read(UnboundedKafkaSource) is currently not supported.
>>>>>>>>    at 
>>>>>>>> org.apache.beam.runners.flink.translation.FlinkBatchPipelineTranslator.visitTransform(FlinkBatchPipelineTranslator.java:111)
>>>>>>>>    at 
>>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
>>>>>>>>    at 
>>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>>>>>>>>    at 
>>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>>>>>>>>    at 
>>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
>>>>>>>>    at 
>>>>>>>> com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259)
>>>>>>>>    at 
>>>>>>>> org.apache.beam.runners.flink.translation.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:34)
>>>>>>>>    at 
>>>>>>>> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:130)
>>>>>>>>    at 
>>>>>>>> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:109)
>>>>>>>>    at 
>>>>>>>> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:50)
>>>>>>>>    at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
>>>>>>>>    at 
>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140)
>>>>>>>>    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>>>    at 
>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>>>    at 
>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>>>    at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>>    at 
>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>>>>>    ... 6 more
>>>>>>>> 
>>>>>>>> 
>>>>>>>>> On Mar 21, 2016, at 12:23 PM, Raghu Angadi <[email protected]> wrote:
>>>>>>>>> 
>>>>>>>>> Thanks Max.
>>>>>>>>> 
>>>>>>>>> Bill McCarthy,
>>>>>>>>> I know you are unblocked and KafkaWriter is good enough. Please try 
>>>>>>>>> KafkaIO source from my branch with Flink runner if you get a chance.
>>>>>>>>> 
>>>>>>>>> thanks,
>>>>>>>>> Raghu.
>>>>>>>>> 
>>>>>>>>> On Mon, Mar 21, 2016 at 6:54 AM, Jean-Baptiste Onofré 
>>>>>>>>> <[email protected]> wrote:
>>>>>>>>> Thanks for the update Max !
>>>>>>>>> 
>>>>>>>>> Regards
>>>>>>>>> JB
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On 03/21/2016 02:39 PM, Maximilian Michels wrote:
>>>>>>>>> FYI: The Runner registration has been fixed. The Flink runner
>>>>>>>>> explicitly registers as of [1]. Also, the SDK tries to look up the
>>>>>>>>> PipelineRunner class in case it has not been registered [2].
>>>>>>>>> 
>>>>>>>>> [1] https://github.com/apache/incubator-beam/pull/40
>>>>>>>>> [2] https://github.com/apache/incubator-beam/pull/61
>>>>>>>>> 
>>>>>>>>> On Sat, Mar 19, 2016 at 6:43 PM, Maximilian Michels <[email protected]> 
>>>>>>>>> wrote:
>>>>>>>>> Great to see such a lively discussion here.
>>>>>>>>> 
>>>>>>>>> I think we'll support sinks through the Write interface (like in
>>>>>>>>> batched execution) and also have a dedicated wrapper for the Flink
>>>>>>>>> sinks. This is a very pressing but easy to solve issue of the Flink
>>>>>>>>> runner. Expect it to be in next week.
>>>>>>>>> 
>>>>>>>>> Also, the proper registration of the runner is about to to be merged.
>>>>>>>>> We just need an ok from the contributor to merge the changes.
>>>>>>>>> 
>>>>>>>>> Best,
>>>>>>>>> Max
>>>>>>>>> 
>>>>>>>>> On Sat, Mar 19, 2016 at 12:42 AM, Dan Halperin <[email protected]> 
>>>>>>>>> wrote:
>>>>>>>>> Thanks Bill!
>>>>>>>>> 
>>>>>>>>> Filed https://issues.apache.org/jira/browse/BEAM-136, but I'm glad 
>>>>>>>>> it's not
>>>>>>>>> blocking you!
>>>>>>>>> 
>>>>>>>>> On Fri, Mar 18, 2016 at 4:04 PM, William McCarthy
>>>>>>>>> <[email protected]> wrote:
>>>>>>>>> 
>>>>>>>>> I tried that, but still no dice: Just to be clear, it’s not a blocker 
>>>>>>>>> for
>>>>>>>>> me, given that I have my example running, but for your information the
>>>>>>>>> exception is below.
>>>>>>>>> 
>>>>>>>>> I’ll watch the commit log on the beam incubator and look forward to
>>>>>>>>> deleting my copy of Raghu’s contributions when they’re merger to 
>>>>>>>>> master.
>>>>>>>>> 
>>>>>>>>> Thanks again for everyone’s help,
>>>>>>>>> 
>>>>>>>>> Bill
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Command followed by exception:
>>>>>>>>> 
>>>>>>>>> $ flink run -c
>>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>>>>>>>>> target/beam-1.0-SNAPSHOT.jar
>>>>>>>>> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner
>>>>>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in 
>>>>>>>>> --outputTopic=test_out
>>>>>>>>> 
>>>>>>>>> ------------------------------------------------------------
>>>>>>>>> The program finished with the following exception:
>>>>>>>>> 
>>>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>>>>>>>> method caused an error.
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>>>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' 
>>>>>>>>> specified
>>>>>>>>> 'org.apache.beam.runners.flink.FlinkPipelineRunner', supported 
>>>>>>>>> pipeline
>>>>>>>>> runners [BlockingDataflowPipelineRunner, DataflowPipelineRunner,
>>>>>>>>> DirectPipelineRunner]
>>>>>>>>> at
>>>>>>>>> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146)
>>>>>>>>> at
>>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445)
>>>>>>>>> at
>>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99)
>>>>>>>>> at
>>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284)
>>>>>>>>> at
>>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>>>> at
>>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>>>> at
>>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>>>>>> ... 6 more
>>>>>>>>> 
>>>>>>>>> On Mar 18, 2016, at 5:35 PM, Thomas Groh <[email protected]> wrote:
>>>>>>>>> 
>>>>>>>>> I don't believe the FlinkPipelineRunner is registered the same way the
>>>>>>>>> Dataflow & Direct Pipeline runners are registered; using
>>>>>>>>> org.apache.beam.runners.flink.FlinkPipelineRunner should work
>>>>>>>>> 
>>>>>>>>> On Fri, Mar 18, 2016 at 2:10 PM, William McCarthy
>>>>>>>>> <[email protected]> wrote:
>>>>>>>>> 
>>>>>>>>> Thanks Dan,
>>>>>>>>> 
>>>>>>>>> I tried that, but getting the below. Note that the jar contains the
>>>>>>>>> FlinkPipelineRunner.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> % jar -tf target/beam-1.0-SNAPSHOT.jar | grep FlinkPipeline
>>>>>>>>> org/apache/beam/runners/flink/FlinkPipelineRunner.class
>>>>>>>>> org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.class
>>>>>>>>> org/apache/beam/runners/flink/FlinkPipelineOptions.class
>>>>>>>>> org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.class
>>>>>>>>> 
>>>>>>>>> % flink run -c
>>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>>>>>>>>> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner
>>>>>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in 
>>>>>>>>> --outputTopic=test_out
>>>>>>>>> 
>>>>>>>>> ------------------------------------------------------------
>>>>>>>>> The program finished with the following exception:
>>>>>>>>> 
>>>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>>>>>>>> method caused an error.
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>>>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' 
>>>>>>>>> specified
>>>>>>>>> 'FlinkPipelineRunner', supported pipeline runners
>>>>>>>>> [BlockingDataflowPipelineRunner, DataflowPipelineRunner,
>>>>>>>>> DirectPipelineRunner]
>>>>>>>>> at
>>>>>>>>> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146)
>>>>>>>>> at
>>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445)
>>>>>>>>> at
>>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99)
>>>>>>>>> at
>>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284)
>>>>>>>>> at
>>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>>>> at
>>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>>>> at
>>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>>>>>> ... 6 more
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Mar 18, 2016, at 5:00 PM, Dan Halperin <[email protected]> wrote:
>>>>>>>>> 
>>>>>>>>> Thanks for catching that, Aljoscha!
>>>>>>>>> 
>>>>>>>>> Note that the Flink runner should be available via a command-line 
>>>>>>>>> option
>>>>>>>>> as well: --runner=FlinkPipelineRunner.
>>>>>>>>> 
>>>>>>>>> The list of valid values for that flag is computed by walking the
>>>>>>>>> classpath at runtime, so as long as the Flink jar is present it'll 
>>>>>>>>> work.
>>>>>>>>> 
>>>>>>>>> On Fri, Mar 18, 2016 at 1:21 PM, Aljoscha Krettek 
>>>>>>>>> <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>> Hi,
>>>>>>>>> looks like the example is being executed with the DirectPipelineRunner
>>>>>>>>> which does not seem to be able to cope with UnboundedSource. You need 
>>>>>>>>> to set
>>>>>>>>> the runner to the FlinkRunner in the example code as described here:
>>>>>>>>> https://github.com/apache/incubator-beam/tree/master/runners/flink#executing-an-example
>>>>>>>>> 
>>>>>>>>> The Flink runner should be able to deal with UnboundedSource but has 
>>>>>>>>> the
>>>>>>>>> limitation that sources are always parallelism=1 (this is being 
>>>>>>>>> worked on,
>>>>>>>>> however).
>>>>>>>>> 
>>>>>>>>> Cheers,
>>>>>>>>> Aljoscha
>>>>>>>>> On 18 Mar 2016, at 20:56, Dan Halperin <[email protected]> wrote:
>>>>>>>>> 
>>>>>>>>> Looks like the Flink runner may not yet support arbitrary code written
>>>>>>>>> with the UnboundedSource API. That is, it looks like the Flink runner
>>>>>>>>> expects the sources to get translated away.
>>>>>>>>> 
>>>>>>>>> Max?
>>>>>>>>> 
>>>>>>>>> Dan
>>>>>>>>> 
>>>>>>>>> On Fri, Mar 18, 2016 at 12:20 PM, William McCarthy
>>>>>>>>> <[email protected]> wrote:
>>>>>>>>> Thanks Raghu,
>>>>>>>>> 
>>>>>>>>> When I try to run it on flink using the incubator-beam code, i.e.
>>>>>>>>> 
>>>>>>>>> flink run -c
>>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>>>>>>>>> target/beam-1.0-SNAPSHOT.jar --bootstrapServers=cl-pu4p:9092
>>>>>>>>> --topics=test_in --outputTopic=test_out
>>>>>>>>> 
>>>>>>>>> I get this:
>>>>>>>>> 
>>>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>>>>>>>> method caused an error.
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>>>>>>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>>>>>> Caused by: java.lang.IllegalStateException: no evaluator registered
>>>>>>>>> for Read(UnboundedKafkaSource)
>>>>>>>>>     at
>>>>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:852)
>>>>>>>>>     at
>>>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
>>>>>>>>>     at
>>>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>>>>>>>>>     at
>>>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>>>>>>>>>     at
>>>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
>>>>>>>>>     at
>>>>>>>>> com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259)
>>>>>>>>>     at
>>>>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814)
>>>>>>>>>     at
>>>>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526)
>>>>>>>>>     at
>>>>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96)
>>>>>>>>>     at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
>>>>>>>>>     at
>>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140)
>>>>>>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>>>>     at
>>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>>>>     at
>>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>>>>>>     ... 6 more
>>>>>>>>> 
>>>>>>>>> Any ideas?
>>>>>>>>> 
>>>>>>>>> Bill
>>>>>>>>> 
>>>>>>>>> On Mar 18, 2016, at 2:47 PM, Raghu Angadi <[email protected]> wrote:
>>>>>>>>> 
>>>>>>>>> Thanks for trying it.
>>>>>>>>> 
>>>>>>>>> I fixed the CheckStyle error  (not sure why my build is not failing).
>>>>>>>>> Let me know if you see any issues running with Beam. I haven't tried 
>>>>>>>>> it. I
>>>>>>>>> should. In fact Daniel Halperin says my patch should be against Beam..
>>>>>>>>> 
>>>>>>>>> Raghu.
>>>>>>>>> 
>>>>>>>>> On Fri, Mar 18, 2016 at 11:22 AM, William McCarthy
>>>>>>>>> <[email protected]> wrote:
>>>>>>>>> Thanks JB, Emanuele, Kostas & Raghu, especially Kostas and Raghu for
>>>>>>>>> pointing me to working code.
>>>>>>>>> 
>>>>>>>>> I’m in the middle of a hack day at the moment, so the speed of your
>>>>>>>>> responses has been very welcome.
>>>>>>>>> 
>>>>>>>>> In the first instance, I’ll try using your changes, Raghu. I’ve
>>>>>>>>> cloned your repo, switched to the kafka branch and built both 
>>>>>>>>> contrib/kafka
>>>>>>>>> and contrib/examples/kafka. The contrib/kafka initially failed with a
>>>>>>>>> CheckStyle error
>>>>>>>>> (/Users/bill/dev/DataflowJavaSDK/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java:683:12:
>>>>>>>>> 'private' modifier out of order with the JLS suggestions)… I’ve fixed 
>>>>>>>>> that
>>>>>>>>> in my local clone and now it’s building fine. I hope to be able to 
>>>>>>>>> run your
>>>>>>>>> contrib unchanged on top of the incubator-beam codebase, which will 
>>>>>>>>> be what
>>>>>>>>> I attempt to do now.
>>>>>>>>> 
>>>>>>>>> Thanks again to all, for your swift help.
>>>>>>>>> 
>>>>>>>>> Bill
>>>>>>>>> 
>>>>>>>>> On Mar 18, 2016, at 12:55 PM, Raghu Angadi <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>> Hi Bill,
>>>>>>>>> 
>>>>>>>>> We have fairly well tested patch for KafkaIO (pr #121). It will be
>>>>>>>>> merged soon. The example there keeps track of top hashtags in 10 
>>>>>>>>> minute
>>>>>>>>> sliding window and writes the results to another Kafka topic. Please 
>>>>>>>>> try it
>>>>>>>>> if you can. It is well tested on Google Cloud Dataflow. I have not 
>>>>>>>>> run it
>>>>>>>>> using Flink runner.
>>>>>>>>> 
>>>>>>>>> Raghu.
>>>>>>>>> 
>>>>>>>>> On Fri, Mar 18, 2016 at 9:46 AM, Kostas Kloudas
>>>>>>>>> <[email protected]> wrote:
>>>>>>>>> Hello Bill,
>>>>>>>>> 
>>>>>>>>> This is a known limitation of the Flink Runner.
>>>>>>>>> There is a JIRA issue for that
>>>>>>>>> https://issues.apache.org/jira/browse/BEAM-127
>>>>>>>>> 
>>>>>>>>> A wrapper for Flink sinks will come soon and as Beam evolves,
>>>>>>>>> a more Beam-y solution will come as well.
>>>>>>>>> 
>>>>>>>>> Kostas
>>>>>>>>> On Mar 18, 2016, at 5:23 PM, William McCarthy
>>>>>>>>> <[email protected]> wrote:
>>>>>>>>> 
>>>>>>>>> Hi,
>>>>>>>>> 
>>>>>>>>> I’m trying to write a proof-of-concept which takes messages from
>>>>>>>>> Kafka, transforms them using Beam on Flink, then pushes the results 
>>>>>>>>> onto a
>>>>>>>>> different Kafka topic.
>>>>>>>>> 
>>>>>>>>> I’ve used the KafkaWindowedWordCountExample as a starting point,
>>>>>>>>> and that’s doing the first part of what I want to do, but it outputs 
>>>>>>>>> to text
>>>>>>>>> files as opposed to Kafka. FlinkKafkaProducer08 looks promising, but 
>>>>>>>>> I can’t
>>>>>>>>> figure out how to plug it into the pipeline. I was thinking that it 
>>>>>>>>> would be
>>>>>>>>> wrapped with an UnboundedFlinkSink, or some such, but that doesn’t 
>>>>>>>>> seem to
>>>>>>>>> exist.
>>>>>>>>> 
>>>>>>>>> Any advice or thoughts on what I’m trying to do?
>>>>>>>>> 
>>>>>>>>> I’m running the latest incubator-beam (as of last night from
>>>>>>>>> Github), Flink 1.0.0 in cluster mode and Kafka 0.9.0.1, all on Google
>>>>>>>>> Compute Engine (Debian Jessie).
>>>>>>>>> 
>>>>>>>>> Thanks,
>>>>>>>>> 
>>>>>>>>> Bill McCarthy
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> --
>>>>>>>>> Jean-Baptiste Onofré
>>>>>>>>> [email protected]
>>>>>>>>> http://blog.nanthrax.net
>>>>>>>>> Talend - http://www.talend.com
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>>> 

Reply via email to