Hi William,

It has been merged. Feel free to try again.

Cheers,
Max

On Wed, Mar 23, 2016 at 5:09 PM, Maximilian Michels <[email protected]> wrote:
> The pull request is here: https://github.com/apache/incubator-beam/pull/69
>
> I'll merge it after the tests have passed and it has been reviewed.
>
> On Wed, Mar 23, 2016 at 3:12 PM, Maximilian Michels <[email protected]> wrote:
>> Hi William,
>>
>> I started working on a fix and better testing of the
>> UnboundedSourceWrapper. I'll get back to you shortly with a pull
>> request that we should be able to merge soon.
>>
>> - Max
>>
>> On Wed, Mar 23, 2016 at 1:33 PM, Aljoscha Krettek <[email protected]> 
>> wrote:
>>> Hi,
>>> as far as I can see from a quick glance the problem is that 
>>> UnboundedSourceWrapper stores an instance of Reader that it gets from the 
>>> Source. The Reader is not Serializable while the UnboundedSource is. I 
>>> think the Reader should be initialized when actually running the source, in 
>>> the run() method.
>>>
>>> Cheers,
>>> Aljoscha
>>>> On 23 Mar 2016, at 13:07, William McCarthy <[email protected]> 
>>>> wrote:
>>>>
>>>> The first stack trace was from the latest runner, yes. I’ve pulled the 
>>>> very latest, just now, and still get the same thing.
>>>>
>>>> When I write ‘pulled the very latest’, here’s what I mean (all of the 
>>>> following commands, except the latest, finished with success):
>>>>
>>>> $ cd incubator-beam
>>>> $ git pull
>>>> …some output, then success
>>>> $ git branch
>>>> * master
>>>> $ mvn -DskipTests clean install
>>>> …some output, then success
>>>> $ cd <my project>
>>>> $ mvn -DskipTests clean install
>>>> …some output, then success
>>>> $ <command as output below>
>>>>
>>>> Bill
>>>>
>>>>> On Mar 22, 2016, at 5:29 AM, Maximilian Michels <[email protected]> wrote:
>>>>>
>>>>> Hi William,
>>>>>
>>>>> Is the first stack trace from the latest master? Using only the simple
>>>>> class name of the Runner should actually work now. I've also added a
>>>>> test to explicitly test that.
>>>>>
>>>>> The latter error, as Aljoscha pointed out, this due to batch
>>>>> execution. Perhaps we could make it explicit during startup which mode
>>>>> we're executing in. File a JIRA issue for that:
>>>>> https://issues.apache.org/jira/browse/BEAM-139
>>>>>
>>>>> - Max
>>>>>
>>>>> On Tue, Mar 22, 2016 at 10:00 AM, Aljoscha Krettek <[email protected]> 
>>>>> wrote:
>>>>>> Hi,
>>>>>> for Flink the runner can internally either translate to a batch job or a 
>>>>>> streaming job. Unbounded sources are not supported when running in batch 
>>>>>> mode so you have to somehow specify that you want to have streaming 
>>>>>> mode. StreamingOptions has method setStreaming, maybe you can specify 
>>>>>> “—streaming true” on the command line to set that flag.
>>>>>>
>>>>>> Cheers,
>>>>>> Aljoscha
>>>>>>> On 21 Mar 2016, at 23:39, William McCarthy <[email protected]> 
>>>>>>> wrote:
>>>>>>>
>>>>>>> I’ve attempted to run the TopHashtagsExample again, using both 
>>>>>>> ‘FlinkPipelineRunner’ and it's fully qualified name. Both bomb out, 
>>>>>>> though the latter gets further. I hope this helps, here is the output 
>>>>>>> of both:
>>>>>>>
>>>>>>> $ flink run -c 
>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample 
>>>>>>> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner 
>>>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out
>>>>>>>
>>>>>>> ------------------------------------------------------------
>>>>>>> The program finished with the following exception:
>>>>>>>
>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The main 
>>>>>>> method caused an error.
>>>>>>>     at 
>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>>>>>>     at 
>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>>>>     at 
>>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>>>>     at 
>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>>>>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>>>     at 
>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>>>>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' 
>>>>>>> specified 'FlinkPipelineRunner', supported pipeline runners 
>>>>>>> [BlockingDataflowPipelineRunner, DataflowPipelineRunner, 
>>>>>>> DirectPipelineRunner]
>>>>>>>     at 
>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1486)
>>>>>>>     at 
>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:101)
>>>>>>>     at 
>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:286)
>>>>>>>     at 
>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
>>>>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>>     at 
>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>>     at 
>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>     at 
>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>>>>     ... 6 more
>>>>>>> Caused by: java.lang.ClassNotFoundException: FlinkPipelineRunner
>>>>>>>     at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>>>>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>>>>     at java.lang.Class.forName0(Native Method)
>>>>>>>     at java.lang.Class.forName(Class.java:264)
>>>>>>>     at 
>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1473)
>>>>>>>     ... 14 more
>>>>>>>
>>>>>>>
>>>>>>> $ flink run -c 
>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample 
>>>>>>> target/beam-1.0-SNAPSHOT.jar 
>>>>>>> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner 
>>>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out
>>>>>>>
>>>>>>> ------------------------------------------------------------
>>>>>>> The program finished with the following exception:
>>>>>>>
>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The main 
>>>>>>> method caused an error.
>>>>>>>     at 
>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>>>>>>     at 
>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>>>>     at 
>>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>>>>     at 
>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>>>>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>>>     at 
>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>>>>     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>>>> Caused by: java.lang.UnsupportedOperationException: The transform 
>>>>>>> Read(UnboundedKafkaSource) is currently not supported.
>>>>>>>     at 
>>>>>>> org.apache.beam.runners.flink.translation.FlinkBatchPipelineTranslator.visitTransform(FlinkBatchPipelineTranslator.java:111)
>>>>>>>     at 
>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
>>>>>>>     at 
>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>>>>>>>     at 
>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>>>>>>>     at 
>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
>>>>>>>     at 
>>>>>>> com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259)
>>>>>>>     at 
>>>>>>> org.apache.beam.runners.flink.translation.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:34)
>>>>>>>     at 
>>>>>>> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:130)
>>>>>>>     at 
>>>>>>> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:109)
>>>>>>>     at 
>>>>>>> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:50)
>>>>>>>     at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
>>>>>>>     at 
>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140)
>>>>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>>     at 
>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>>     at 
>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>     at 
>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>>>>     ... 6 more
>>>>>>>
>>>>>>>
>>>>>>>> On Mar 21, 2016, at 12:23 PM, Raghu Angadi <[email protected]> wrote:
>>>>>>>>
>>>>>>>> Thanks Max.
>>>>>>>>
>>>>>>>> Bill McCarthy,
>>>>>>>> I know you are unblocked and KafkaWriter is good enough. Please try 
>>>>>>>> KafkaIO source from my branch with Flink runner if you get a chance.
>>>>>>>>
>>>>>>>> thanks,
>>>>>>>> Raghu.
>>>>>>>>
>>>>>>>> On Mon, Mar 21, 2016 at 6:54 AM, Jean-Baptiste Onofré 
>>>>>>>> <[email protected]> wrote:
>>>>>>>> Thanks for the update Max !
>>>>>>>>
>>>>>>>> Regards
>>>>>>>> JB
>>>>>>>>
>>>>>>>>
>>>>>>>> On 03/21/2016 02:39 PM, Maximilian Michels wrote:
>>>>>>>> FYI: The Runner registration has been fixed. The Flink runner
>>>>>>>> explicitly registers as of [1]. Also, the SDK tries to look up the
>>>>>>>> PipelineRunner class in case it has not been registered [2].
>>>>>>>>
>>>>>>>> [1] https://github.com/apache/incubator-beam/pull/40
>>>>>>>> [2] https://github.com/apache/incubator-beam/pull/61
>>>>>>>>
>>>>>>>> On Sat, Mar 19, 2016 at 6:43 PM, Maximilian Michels <[email protected]> 
>>>>>>>> wrote:
>>>>>>>> Great to see such a lively discussion here.
>>>>>>>>
>>>>>>>> I think we'll support sinks through the Write interface (like in
>>>>>>>> batched execution) and also have a dedicated wrapper for the Flink
>>>>>>>> sinks. This is a very pressing but easy to solve issue of the Flink
>>>>>>>> runner. Expect it to be in next week.
>>>>>>>>
>>>>>>>> Also, the proper registration of the runner is about to to be merged.
>>>>>>>> We just need an ok from the contributor to merge the changes.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Max
>>>>>>>>
>>>>>>>> On Sat, Mar 19, 2016 at 12:42 AM, Dan Halperin <[email protected]> 
>>>>>>>> wrote:
>>>>>>>> Thanks Bill!
>>>>>>>>
>>>>>>>> Filed https://issues.apache.org/jira/browse/BEAM-136, but I'm glad 
>>>>>>>> it's not
>>>>>>>> blocking you!
>>>>>>>>
>>>>>>>> On Fri, Mar 18, 2016 at 4:04 PM, William McCarthy
>>>>>>>> <[email protected]> wrote:
>>>>>>>>
>>>>>>>> I tried that, but still no dice: Just to be clear, it’s not a blocker 
>>>>>>>> for
>>>>>>>> me, given that I have my example running, but for your information the
>>>>>>>> exception is below.
>>>>>>>>
>>>>>>>> I’ll watch the commit log on the beam incubator and look forward to
>>>>>>>> deleting my copy of Raghu’s contributions when they’re merger to 
>>>>>>>> master.
>>>>>>>>
>>>>>>>> Thanks again for everyone’s help,
>>>>>>>>
>>>>>>>> Bill
>>>>>>>>
>>>>>>>>
>>>>>>>> Command followed by exception:
>>>>>>>>
>>>>>>>> $ flink run -c
>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>>>>>>>> target/beam-1.0-SNAPSHOT.jar
>>>>>>>> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner
>>>>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out
>>>>>>>>
>>>>>>>> ------------------------------------------------------------
>>>>>>>> The program finished with the following exception:
>>>>>>>>
>>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>>>>>>> method caused an error.
>>>>>>>> at
>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>>>>>>> at
>>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>>>>> at
>>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>>>> at
>>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' 
>>>>>>>> specified
>>>>>>>> 'org.apache.beam.runners.flink.FlinkPipelineRunner', supported pipeline
>>>>>>>> runners [BlockingDataflowPipelineRunner, DataflowPipelineRunner,
>>>>>>>> DirectPipelineRunner]
>>>>>>>> at
>>>>>>>> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146)
>>>>>>>> at
>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445)
>>>>>>>> at
>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99)
>>>>>>>> at
>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284)
>>>>>>>> at
>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>>> at
>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>>> at
>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>> at
>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>>>>> ... 6 more
>>>>>>>>
>>>>>>>> On Mar 18, 2016, at 5:35 PM, Thomas Groh <[email protected]> wrote:
>>>>>>>>
>>>>>>>> I don't believe the FlinkPipelineRunner is registered the same way the
>>>>>>>> Dataflow & Direct Pipeline runners are registered; using
>>>>>>>> org.apache.beam.runners.flink.FlinkPipelineRunner should work
>>>>>>>>
>>>>>>>> On Fri, Mar 18, 2016 at 2:10 PM, William McCarthy
>>>>>>>> <[email protected]> wrote:
>>>>>>>>
>>>>>>>> Thanks Dan,
>>>>>>>>
>>>>>>>> I tried that, but getting the below. Note that the jar contains the
>>>>>>>> FlinkPipelineRunner.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> % jar -tf target/beam-1.0-SNAPSHOT.jar | grep FlinkPipeline
>>>>>>>> org/apache/beam/runners/flink/FlinkPipelineRunner.class
>>>>>>>> org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.class
>>>>>>>> org/apache/beam/runners/flink/FlinkPipelineOptions.class
>>>>>>>> org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.class
>>>>>>>>
>>>>>>>> % flink run -c
>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>>>>>>>> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner
>>>>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out
>>>>>>>>
>>>>>>>> ------------------------------------------------------------
>>>>>>>> The program finished with the following exception:
>>>>>>>>
>>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>>>>>>> method caused an error.
>>>>>>>> at
>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>>>>>>> at
>>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>>>>> at
>>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>>>> at
>>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' 
>>>>>>>> specified
>>>>>>>> 'FlinkPipelineRunner', supported pipeline runners
>>>>>>>> [BlockingDataflowPipelineRunner, DataflowPipelineRunner,
>>>>>>>> DirectPipelineRunner]
>>>>>>>> at
>>>>>>>> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146)
>>>>>>>> at
>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445)
>>>>>>>> at
>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99)
>>>>>>>> at
>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284)
>>>>>>>> at
>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>>> at
>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>>> at
>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>> at
>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>>>>> ... 6 more
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mar 18, 2016, at 5:00 PM, Dan Halperin <[email protected]> wrote:
>>>>>>>>
>>>>>>>> Thanks for catching that, Aljoscha!
>>>>>>>>
>>>>>>>> Note that the Flink runner should be available via a command-line 
>>>>>>>> option
>>>>>>>> as well: --runner=FlinkPipelineRunner.
>>>>>>>>
>>>>>>>> The list of valid values for that flag is computed by walking the
>>>>>>>> classpath at runtime, so as long as the Flink jar is present it'll 
>>>>>>>> work.
>>>>>>>>
>>>>>>>> On Fri, Mar 18, 2016 at 1:21 PM, Aljoscha Krettek <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Hi,
>>>>>>>> looks like the example is being executed with the DirectPipelineRunner
>>>>>>>> which does not seem to be able to cope with UnboundedSource. You need 
>>>>>>>> to set
>>>>>>>> the runner to the FlinkRunner in the example code as described here:
>>>>>>>> https://github.com/apache/incubator-beam/tree/master/runners/flink#executing-an-example
>>>>>>>>
>>>>>>>> The Flink runner should be able to deal with UnboundedSource but has 
>>>>>>>> the
>>>>>>>> limitation that sources are always parallelism=1 (this is being worked 
>>>>>>>> on,
>>>>>>>> however).
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Aljoscha
>>>>>>>> On 18 Mar 2016, at 20:56, Dan Halperin <[email protected]> wrote:
>>>>>>>>
>>>>>>>> Looks like the Flink runner may not yet support arbitrary code written
>>>>>>>> with the UnboundedSource API. That is, it looks like the Flink runner
>>>>>>>> expects the sources to get translated away.
>>>>>>>>
>>>>>>>> Max?
>>>>>>>>
>>>>>>>> Dan
>>>>>>>>
>>>>>>>> On Fri, Mar 18, 2016 at 12:20 PM, William McCarthy
>>>>>>>> <[email protected]> wrote:
>>>>>>>> Thanks Raghu,
>>>>>>>>
>>>>>>>> When I try to run it on flink using the incubator-beam code, i.e.
>>>>>>>>
>>>>>>>> flink run -c
>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>>>>>>>> target/beam-1.0-SNAPSHOT.jar --bootstrapServers=cl-pu4p:9092
>>>>>>>> --topics=test_in --outputTopic=test_out
>>>>>>>>
>>>>>>>> I get this:
>>>>>>>>
>>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>>>>>>> method caused an error.
>>>>>>>>      at
>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>>>>>>>      at
>>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>>>>>      at
>>>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>>>>>      at
>>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>>>>>      at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>>>>      at
>>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>>>>>      at
>>>>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>>>>> Caused by: java.lang.IllegalStateException: no evaluator registered
>>>>>>>> for Read(UnboundedKafkaSource)
>>>>>>>>      at
>>>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:852)
>>>>>>>>      at
>>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
>>>>>>>>      at
>>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>>>>>>>>      at
>>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>>>>>>>>      at
>>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
>>>>>>>>      at
>>>>>>>> com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259)
>>>>>>>>      at
>>>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814)
>>>>>>>>      at
>>>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526)
>>>>>>>>      at
>>>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96)
>>>>>>>>      at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
>>>>>>>>      at
>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140)
>>>>>>>>      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>>>      at
>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>>>      at
>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>>>      at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>>      at
>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>>>>>      ... 6 more
>>>>>>>>
>>>>>>>> Any ideas?
>>>>>>>>
>>>>>>>> Bill
>>>>>>>>
>>>>>>>> On Mar 18, 2016, at 2:47 PM, Raghu Angadi <[email protected]> wrote:
>>>>>>>>
>>>>>>>> Thanks for trying it.
>>>>>>>>
>>>>>>>> I fixed the CheckStyle error  (not sure why my build is not failing).
>>>>>>>> Let me know if you see any issues running with Beam. I haven't tried 
>>>>>>>> it. I
>>>>>>>> should. In fact Daniel Halperin says my patch should be against Beam..
>>>>>>>>
>>>>>>>> Raghu.
>>>>>>>>
>>>>>>>> On Fri, Mar 18, 2016 at 11:22 AM, William McCarthy
>>>>>>>> <[email protected]> wrote:
>>>>>>>> Thanks JB, Emanuele, Kostas & Raghu, especially Kostas and Raghu for
>>>>>>>> pointing me to working code.
>>>>>>>>
>>>>>>>> I’m in the middle of a hack day at the moment, so the speed of your
>>>>>>>> responses has been very welcome.
>>>>>>>>
>>>>>>>> In the first instance, I’ll try using your changes, Raghu. I’ve
>>>>>>>> cloned your repo, switched to the kafka branch and built both 
>>>>>>>> contrib/kafka
>>>>>>>> and contrib/examples/kafka. The contrib/kafka initially failed with a
>>>>>>>> CheckStyle error
>>>>>>>> (/Users/bill/dev/DataflowJavaSDK/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java:683:12:
>>>>>>>> 'private' modifier out of order with the JLS suggestions)… I’ve fixed 
>>>>>>>> that
>>>>>>>> in my local clone and now it’s building fine. I hope to be able to run 
>>>>>>>> your
>>>>>>>> contrib unchanged on top of the incubator-beam codebase, which will be 
>>>>>>>> what
>>>>>>>> I attempt to do now.
>>>>>>>>
>>>>>>>> Thanks again to all, for your swift help.
>>>>>>>>
>>>>>>>> Bill
>>>>>>>>
>>>>>>>> On Mar 18, 2016, at 12:55 PM, Raghu Angadi <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Hi Bill,
>>>>>>>>
>>>>>>>> We have fairly well tested patch for KafkaIO (pr #121). It will be
>>>>>>>> merged soon. The example there keeps track of top hashtags in 10 minute
>>>>>>>> sliding window and writes the results to another Kafka topic. Please 
>>>>>>>> try it
>>>>>>>> if you can. It is well tested on Google Cloud Dataflow. I have not run 
>>>>>>>> it
>>>>>>>> using Flink runner.
>>>>>>>>
>>>>>>>> Raghu.
>>>>>>>>
>>>>>>>> On Fri, Mar 18, 2016 at 9:46 AM, Kostas Kloudas
>>>>>>>> <[email protected]> wrote:
>>>>>>>> Hello Bill,
>>>>>>>>
>>>>>>>> This is a known limitation of the Flink Runner.
>>>>>>>> There is a JIRA issue for that
>>>>>>>> https://issues.apache.org/jira/browse/BEAM-127
>>>>>>>>
>>>>>>>> A wrapper for Flink sinks will come soon and as Beam evolves,
>>>>>>>> a more Beam-y solution will come as well.
>>>>>>>>
>>>>>>>> Kostas
>>>>>>>> On Mar 18, 2016, at 5:23 PM, William McCarthy
>>>>>>>> <[email protected]> wrote:
>>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I’m trying to write a proof-of-concept which takes messages from
>>>>>>>> Kafka, transforms them using Beam on Flink, then pushes the results 
>>>>>>>> onto a
>>>>>>>> different Kafka topic.
>>>>>>>>
>>>>>>>> I’ve used the KafkaWindowedWordCountExample as a starting point,
>>>>>>>> and that’s doing the first part of what I want to do, but it outputs 
>>>>>>>> to text
>>>>>>>> files as opposed to Kafka. FlinkKafkaProducer08 looks promising, but I 
>>>>>>>> can’t
>>>>>>>> figure out how to plug it into the pipeline. I was thinking that it 
>>>>>>>> would be
>>>>>>>> wrapped with an UnboundedFlinkSink, or some such, but that doesn’t 
>>>>>>>> seem to
>>>>>>>> exist.
>>>>>>>>
>>>>>>>> Any advice or thoughts on what I’m trying to do?
>>>>>>>>
>>>>>>>> I’m running the latest incubator-beam (as of last night from
>>>>>>>> Github), Flink 1.0.0 in cluster mode and Kafka 0.9.0.1, all on Google
>>>>>>>> Compute Engine (Debian Jessie).
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>>
>>>>>>>> Bill McCarthy
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Jean-Baptiste Onofré
>>>>>>>> [email protected]
>>>>>>>> http://blog.nanthrax.net
>>>>>>>> Talend - http://www.talend.com
>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>

Reply via email to