Hi,
as far as I can see from a quick glance the problem is that 
UnboundedSourceWrapper stores an instance of Reader that it gets from the 
Source. The Reader is not Serializable while the UnboundedSource is. I think 
the Reader should be initialized when actually running the source, in the run() 
method.

Cheers,
Aljoscha
> On 23 Mar 2016, at 13:07, William McCarthy <[email protected]> wrote:
> 
> The first stack trace was from the latest runner, yes. I’ve pulled the very 
> latest, just now, and still get the same thing.
> 
> When I write ‘pulled the very latest’, here’s what I mean (all of the 
> following commands, except the latest, finished with success):
> 
> $ cd incubator-beam
> $ git pull
> …some output, then success
> $ git branch
> * master
> $ mvn -DskipTests clean install
> …some output, then success
> $ cd <my project>
> $ mvn -DskipTests clean install
> …some output, then success
> $ <command as output below>
> 
> Bill
> 
>> On Mar 22, 2016, at 5:29 AM, Maximilian Michels <[email protected]> wrote:
>> 
>> Hi William,
>> 
>> Is the first stack trace from the latest master? Using only the simple
>> class name of the Runner should actually work now. I've also added a
>> test to explicitly test that.
>> 
>> The latter error, as Aljoscha pointed out, this due to batch
>> execution. Perhaps we could make it explicit during startup which mode
>> we're executing in. File a JIRA issue for that:
>> https://issues.apache.org/jira/browse/BEAM-139
>> 
>> - Max
>> 
>> On Tue, Mar 22, 2016 at 10:00 AM, Aljoscha Krettek <[email protected]> 
>> wrote:
>>> Hi,
>>> for Flink the runner can internally either translate to a batch job or a 
>>> streaming job. Unbounded sources are not supported when running in batch 
>>> mode so you have to somehow specify that you want to have streaming mode. 
>>> StreamingOptions has method setStreaming, maybe you can specify “—streaming 
>>> true” on the command line to set that flag.
>>> 
>>> Cheers,
>>> Aljoscha
>>>> On 21 Mar 2016, at 23:39, William McCarthy <[email protected]> 
>>>> wrote:
>>>> 
>>>> I’ve attempted to run the TopHashtagsExample again, using both 
>>>> ‘FlinkPipelineRunner’ and it's fully qualified name. Both bomb out, though 
>>>> the latter gets further. I hope this helps, here is the output of both:
>>>> 
>>>> $ flink run -c 
>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample 
>>>> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner 
>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out
>>>> 
>>>> ------------------------------------------------------------
>>>> The program finished with the following exception:
>>>> 
>>>> org.apache.flink.client.program.ProgramInvocationException: The main 
>>>> method caused an error.
>>>>     at 
>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>>>     at 
>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>     at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>     at 
>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>     at 
>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' specified 
>>>> 'FlinkPipelineRunner', supported pipeline runners 
>>>> [BlockingDataflowPipelineRunner, DataflowPipelineRunner, 
>>>> DirectPipelineRunner]
>>>>     at 
>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1486)
>>>>     at 
>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:101)
>>>>     at 
>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:286)
>>>>     at 
>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>     at 
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>     at 
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>     at 
>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>     ... 6 more
>>>> Caused by: java.lang.ClassNotFoundException: FlinkPipelineRunner
>>>>     at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>     at java.lang.Class.forName0(Native Method)
>>>>     at java.lang.Class.forName(Class.java:264)
>>>>     at 
>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1473)
>>>>     ... 14 more
>>>> 
>>>> 
>>>> $ flink run -c 
>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample 
>>>> target/beam-1.0-SNAPSHOT.jar 
>>>> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner 
>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out
>>>> 
>>>> ------------------------------------------------------------
>>>> The program finished with the following exception:
>>>> 
>>>> org.apache.flink.client.program.ProgramInvocationException: The main 
>>>> method caused an error.
>>>>     at 
>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>>>     at 
>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>     at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>     at 
>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>     at 
>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>> Caused by: java.lang.UnsupportedOperationException: The transform 
>>>> Read(UnboundedKafkaSource) is currently not supported.
>>>>     at 
>>>> org.apache.beam.runners.flink.translation.FlinkBatchPipelineTranslator.visitTransform(FlinkBatchPipelineTranslator.java:111)
>>>>     at 
>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
>>>>     at 
>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>>>>     at 
>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>>>>     at 
>>>> com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
>>>>     at 
>>>> com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259)
>>>>     at 
>>>> org.apache.beam.runners.flink.translation.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:34)
>>>>     at 
>>>> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:130)
>>>>     at 
>>>> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:109)
>>>>     at 
>>>> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:50)
>>>>     at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
>>>>     at 
>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140)
>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>     at 
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>     at 
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>     at 
>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>     ... 6 more
>>>> 
>>>> 
>>>>> On Mar 21, 2016, at 12:23 PM, Raghu Angadi <[email protected]> wrote:
>>>>> 
>>>>> Thanks Max.
>>>>> 
>>>>> Bill McCarthy,
>>>>> I know you are unblocked and KafkaWriter is good enough. Please try 
>>>>> KafkaIO source from my branch with Flink runner if you get a chance.
>>>>> 
>>>>> thanks,
>>>>> Raghu.
>>>>> 
>>>>> On Mon, Mar 21, 2016 at 6:54 AM, Jean-Baptiste Onofré <[email protected]> 
>>>>> wrote:
>>>>> Thanks for the update Max !
>>>>> 
>>>>> Regards
>>>>> JB
>>>>> 
>>>>> 
>>>>> On 03/21/2016 02:39 PM, Maximilian Michels wrote:
>>>>> FYI: The Runner registration has been fixed. The Flink runner
>>>>> explicitly registers as of [1]. Also, the SDK tries to look up the
>>>>> PipelineRunner class in case it has not been registered [2].
>>>>> 
>>>>> [1] https://github.com/apache/incubator-beam/pull/40
>>>>> [2] https://github.com/apache/incubator-beam/pull/61
>>>>> 
>>>>> On Sat, Mar 19, 2016 at 6:43 PM, Maximilian Michels <[email protected]> 
>>>>> wrote:
>>>>> Great to see such a lively discussion here.
>>>>> 
>>>>> I think we'll support sinks through the Write interface (like in
>>>>> batched execution) and also have a dedicated wrapper for the Flink
>>>>> sinks. This is a very pressing but easy to solve issue of the Flink
>>>>> runner. Expect it to be in next week.
>>>>> 
>>>>> Also, the proper registration of the runner is about to to be merged.
>>>>> We just need an ok from the contributor to merge the changes.
>>>>> 
>>>>> Best,
>>>>> Max
>>>>> 
>>>>> On Sat, Mar 19, 2016 at 12:42 AM, Dan Halperin <[email protected]> 
>>>>> wrote:
>>>>> Thanks Bill!
>>>>> 
>>>>> Filed https://issues.apache.org/jira/browse/BEAM-136, but I'm glad it's 
>>>>> not
>>>>> blocking you!
>>>>> 
>>>>> On Fri, Mar 18, 2016 at 4:04 PM, William McCarthy
>>>>> <[email protected]> wrote:
>>>>> 
>>>>> I tried that, but still no dice: Just to be clear, it’s not a blocker for
>>>>> me, given that I have my example running, but for your information the
>>>>> exception is below.
>>>>> 
>>>>> I’ll watch the commit log on the beam incubator and look forward to
>>>>> deleting my copy of Raghu’s contributions when they’re merger to master.
>>>>> 
>>>>> Thanks again for everyone’s help,
>>>>> 
>>>>> Bill
>>>>> 
>>>>> 
>>>>> Command followed by exception:
>>>>> 
>>>>> $ flink run -c
>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>>>>> target/beam-1.0-SNAPSHOT.jar
>>>>> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner
>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out
>>>>> 
>>>>> ------------------------------------------------------------
>>>>> The program finished with the following exception:
>>>>> 
>>>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>>>> method caused an error.
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>> at
>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>> at
>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' specified
>>>>> 'org.apache.beam.runners.flink.FlinkPipelineRunner', supported pipeline
>>>>> runners [BlockingDataflowPipelineRunner, DataflowPipelineRunner,
>>>>> DirectPipelineRunner]
>>>>> at
>>>>> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146)
>>>>> at
>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445)
>>>>> at
>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99)
>>>>> at
>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284)
>>>>> at
>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> at
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>> at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>> ... 6 more
>>>>> 
>>>>> On Mar 18, 2016, at 5:35 PM, Thomas Groh <[email protected]> wrote:
>>>>> 
>>>>> I don't believe the FlinkPipelineRunner is registered the same way the
>>>>> Dataflow & Direct Pipeline runners are registered; using
>>>>> org.apache.beam.runners.flink.FlinkPipelineRunner should work
>>>>> 
>>>>> On Fri, Mar 18, 2016 at 2:10 PM, William McCarthy
>>>>> <[email protected]> wrote:
>>>>> 
>>>>> Thanks Dan,
>>>>> 
>>>>> I tried that, but getting the below. Note that the jar contains the
>>>>> FlinkPipelineRunner.
>>>>> 
>>>>> 
>>>>> 
>>>>> % jar -tf target/beam-1.0-SNAPSHOT.jar | grep FlinkPipeline
>>>>> org/apache/beam/runners/flink/FlinkPipelineRunner.class
>>>>> org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.class
>>>>> org/apache/beam/runners/flink/FlinkPipelineOptions.class
>>>>> org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.class
>>>>> 
>>>>> % flink run -c
>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>>>>> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner
>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out
>>>>> 
>>>>> ------------------------------------------------------------
>>>>> The program finished with the following exception:
>>>>> 
>>>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>>>> method caused an error.
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>> at
>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>> at
>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' specified
>>>>> 'FlinkPipelineRunner', supported pipeline runners
>>>>> [BlockingDataflowPipelineRunner, DataflowPipelineRunner,
>>>>> DirectPipelineRunner]
>>>>> at
>>>>> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146)
>>>>> at
>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445)
>>>>> at
>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99)
>>>>> at
>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284)
>>>>> at
>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> at
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>> at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>> ... 6 more
>>>>> 
>>>>> 
>>>>> 
>>>>> On Mar 18, 2016, at 5:00 PM, Dan Halperin <[email protected]> wrote:
>>>>> 
>>>>> Thanks for catching that, Aljoscha!
>>>>> 
>>>>> Note that the Flink runner should be available via a command-line option
>>>>> as well: --runner=FlinkPipelineRunner.
>>>>> 
>>>>> The list of valid values for that flag is computed by walking the
>>>>> classpath at runtime, so as long as the Flink jar is present it'll work.
>>>>> 
>>>>> On Fri, Mar 18, 2016 at 1:21 PM, Aljoscha Krettek <[email protected]>
>>>>> wrote:
>>>>> 
>>>>> Hi,
>>>>> looks like the example is being executed with the DirectPipelineRunner
>>>>> which does not seem to be able to cope with UnboundedSource. You need to 
>>>>> set
>>>>> the runner to the FlinkRunner in the example code as described here:
>>>>> https://github.com/apache/incubator-beam/tree/master/runners/flink#executing-an-example
>>>>> 
>>>>> The Flink runner should be able to deal with UnboundedSource but has the
>>>>> limitation that sources are always parallelism=1 (this is being worked on,
>>>>> however).
>>>>> 
>>>>> Cheers,
>>>>> Aljoscha
>>>>> On 18 Mar 2016, at 20:56, Dan Halperin <[email protected]> wrote:
>>>>> 
>>>>> Looks like the Flink runner may not yet support arbitrary code written
>>>>> with the UnboundedSource API. That is, it looks like the Flink runner
>>>>> expects the sources to get translated away.
>>>>> 
>>>>> Max?
>>>>> 
>>>>> Dan
>>>>> 
>>>>> On Fri, Mar 18, 2016 at 12:20 PM, William McCarthy
>>>>> <[email protected]> wrote:
>>>>> Thanks Raghu,
>>>>> 
>>>>> When I try to run it on flink using the incubator-beam code, i.e.
>>>>> 
>>>>> flink run -c
>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>>>>> target/beam-1.0-SNAPSHOT.jar --bootstrapServers=cl-pu4p:9092
>>>>> --topics=test_in --outputTopic=test_out
>>>>> 
>>>>> I get this:
>>>>> 
>>>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>>>> method caused an error.
>>>>>      at
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>>>>      at
>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>>      at
>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>>      at
>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>>      at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>      at
>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>>      at
>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>> Caused by: java.lang.IllegalStateException: no evaluator registered
>>>>> for Read(UnboundedKafkaSource)
>>>>>      at
>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:852)
>>>>>      at
>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
>>>>>      at
>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>>>>>      at
>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>>>>>      at
>>>>> com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
>>>>>      at
>>>>> com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259)
>>>>>      at
>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814)
>>>>>      at
>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526)
>>>>>      at
>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96)
>>>>>      at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
>>>>>      at
>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140)
>>>>>      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>      at
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>      at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>      at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>      at
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>>      ... 6 more
>>>>> 
>>>>> Any ideas?
>>>>> 
>>>>> Bill
>>>>> 
>>>>> On Mar 18, 2016, at 2:47 PM, Raghu Angadi <[email protected]> wrote:
>>>>> 
>>>>> Thanks for trying it.
>>>>> 
>>>>> I fixed the CheckStyle error  (not sure why my build is not failing).
>>>>> Let me know if you see any issues running with Beam. I haven't tried it. I
>>>>> should. In fact Daniel Halperin says my patch should be against Beam..
>>>>> 
>>>>> Raghu.
>>>>> 
>>>>> On Fri, Mar 18, 2016 at 11:22 AM, William McCarthy
>>>>> <[email protected]> wrote:
>>>>> Thanks JB, Emanuele, Kostas & Raghu, especially Kostas and Raghu for
>>>>> pointing me to working code.
>>>>> 
>>>>> I’m in the middle of a hack day at the moment, so the speed of your
>>>>> responses has been very welcome.
>>>>> 
>>>>> In the first instance, I’ll try using your changes, Raghu. I’ve
>>>>> cloned your repo, switched to the kafka branch and built both 
>>>>> contrib/kafka
>>>>> and contrib/examples/kafka. The contrib/kafka initially failed with a
>>>>> CheckStyle error
>>>>> (/Users/bill/dev/DataflowJavaSDK/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java:683:12:
>>>>> 'private' modifier out of order with the JLS suggestions)… I’ve fixed that
>>>>> in my local clone and now it’s building fine. I hope to be able to run 
>>>>> your
>>>>> contrib unchanged on top of the incubator-beam codebase, which will be 
>>>>> what
>>>>> I attempt to do now.
>>>>> 
>>>>> Thanks again to all, for your swift help.
>>>>> 
>>>>> Bill
>>>>> 
>>>>> On Mar 18, 2016, at 12:55 PM, Raghu Angadi <[email protected]>
>>>>> wrote:
>>>>> 
>>>>> Hi Bill,
>>>>> 
>>>>> We have fairly well tested patch for KafkaIO (pr #121). It will be
>>>>> merged soon. The example there keeps track of top hashtags in 10 minute
>>>>> sliding window and writes the results to another Kafka topic. Please try 
>>>>> it
>>>>> if you can. It is well tested on Google Cloud Dataflow. I have not run it
>>>>> using Flink runner.
>>>>> 
>>>>> Raghu.
>>>>> 
>>>>> On Fri, Mar 18, 2016 at 9:46 AM, Kostas Kloudas
>>>>> <[email protected]> wrote:
>>>>> Hello Bill,
>>>>> 
>>>>> This is a known limitation of the Flink Runner.
>>>>> There is a JIRA issue for that
>>>>> https://issues.apache.org/jira/browse/BEAM-127
>>>>> 
>>>>> A wrapper for Flink sinks will come soon and as Beam evolves,
>>>>> a more Beam-y solution will come as well.
>>>>> 
>>>>> Kostas
>>>>> On Mar 18, 2016, at 5:23 PM, William McCarthy
>>>>> <[email protected]> wrote:
>>>>> 
>>>>> Hi,
>>>>> 
>>>>> I’m trying to write a proof-of-concept which takes messages from
>>>>> Kafka, transforms them using Beam on Flink, then pushes the results onto a
>>>>> different Kafka topic.
>>>>> 
>>>>> I’ve used the KafkaWindowedWordCountExample as a starting point,
>>>>> and that’s doing the first part of what I want to do, but it outputs to 
>>>>> text
>>>>> files as opposed to Kafka. FlinkKafkaProducer08 looks promising, but I 
>>>>> can’t
>>>>> figure out how to plug it into the pipeline. I was thinking that it would 
>>>>> be
>>>>> wrapped with an UnboundedFlinkSink, or some such, but that doesn’t seem to
>>>>> exist.
>>>>> 
>>>>> Any advice or thoughts on what I’m trying to do?
>>>>> 
>>>>> I’m running the latest incubator-beam (as of last night from
>>>>> Github), Flink 1.0.0 in cluster mode and Kafka 0.9.0.1, all on Google
>>>>> Compute Engine (Debian Jessie).
>>>>> 
>>>>> Thanks,
>>>>> 
>>>>> Bill McCarthy
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> --
>>>>> Jean-Baptiste Onofré
>>>>> [email protected]
>>>>> http://blog.nanthrax.net
>>>>> Talend - http://www.talend.com
>>>>> 
>>>> 
>>> 
> 

Reply via email to