Your calls should work - so long as you're on a commit after b2b77e380 (when we started implementing PipelineRunner), the InProcessPipelineRunner should be a valid argument to PipelineOptions#setRunner
As an example, there's the InProcessPipelineRunnerTest ( https://github.com/apache/incubator-beam/blob/b9116ac426f989af882e6df5dafc5da6c9f203d8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java#L73 ) On Mon, May 2, 2016 at 9:43 AM, Dan Halperin <[email protected]> wrote: > Hi Amir, > > The problem is likely in using DataflowPipelineOptions.class -- this is > specific to the Cloud Dataflow service and the DataflowPipelineRunner. Try > using just "PipelineOptions". > > Dan > > On Mon, May 2, 2016 at 8:26 AM, amir bahmanyari <[email protected]> > wrote: > >> Thanks Dan. >> I actually had tried it before but got compilation errors at setting the >> InProcessPipelineRunner >> in the PipelineOptions object.. >> I appreciate it if you point me to a working sample code. >> FYI, This is my implementation: >> import com.google.cloud.dataflow.sdk.options.PipelineOptions; >> import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; >> DataflowPipelineOptions Myoptions = >> PipelineOptionsFactory.create().as(DataflowPipelineOptions.class); >> Myoptions.setRunner(InProcessPipelineRunner.class); >> >> I cannot set runner as InProcessPipelineRunner in the last line: >> The method setRunner(Class<? extends PipelineRunner<?>>) in the type >> PipelineOptions is not applicable for the arguments >> (Class<InProcessPipelineRunner>). >> Thanks for your help. >> Amir- >> >> >> ------------------------------ >> *From:* Dan Halperin <[email protected]> >> *To:* [email protected] >> *Sent:* Monday, May 2, 2016 12:23 AM >> *Subject:* Re: KafkaIO Usage & Sample Code >> >> On Mon, May 2, 2016 at 12:22 AM, Dan Halperin <[email protected]> >> wrote: >> >> Hi Amir, >> >> As Frances suggested, you can use the InProcessPipelineRunner instead of >> the DirectPipelineRunner to execute your pipeline. (They're both in the >> codebase, it's just that the Direct runner is the default. Use the --runner >> command line option.) >> >> >> Amending: it is relatively unlikely that the issues that we caught in >> testing would affect you. So it should be safe for your use case to do this >> -- and definitely safe to at least try it out! >> >> >> >> Dan >> >> On Mon, May 2, 2016 at 12:17 AM, Amir Bahmanyari <[email protected]> >> wrote: >> >> Thanks gents >> What are our options in the meanwhile? >> Cheers >> >> Sent from my iPhone >> >> On May 2, 2016, at 12:00 AM, Dan Halperin <[email protected]> wrote: >> >> On Sun, May 1, 2016 at 10:29 PM, Jean-Baptiste Onofré <[email protected]> >> wrote: >> >> Oh, thanks Frances. >> >> I mixed DirectPipelineRunner ("old" local runner), and >> InProcessPipelineRunner ("new" local runner) ;) >> >> We should remove the DirectPipelineRunner to avoid confusion. WDYT ? >> >> >> We would like to do this soon, but there are some snags. >> >> As a preparation step, Thomas swapped the default runner from Direct to >> InProcess. (#178 <https://github.com/apache/incubator-beam/pull/178>) >> >> However, testing unfortunately exposed some issues with the InProcess >> runner. (Actually, I should say "fortunately" because the tests caught it! >> Yay!) So we had to roll it back. (#198 >> <https://github.com/apache/incubator-beam/pull/198>) >> >> Once we improve the InProcess runner, we can re-do the default swap. >> After the swap, once the tests keep passing for a few days, we do indeed >> intend to delete the current Direct pipeline runner and replace it with the >> current InProcess runner. >> >> Dan >> >> >> >> Regards >> JB >> >> On 05/02/2016 03:12 AM, Frances Perry wrote: >> >> +Thomas, author of the InProcessPipelineRunner >> >> The DirectPipelineRunner doesn't yet support unbounded PCollections. You >> can try using the InProcessPipelineRunner, which is the re-write of >> local execution that provides support for unbounded PCollections and >> better checking against the Beam Model. (We'll be renaming this to the >> DirectPipelineRunner in the near future to avoid having both as soon as >> the functionality of the InProcessPipelineRunner is complete.) >> >> On Sun, May 1, 2016 at 4:38 PM, amir bahmanyari <[email protected] >> <mailto:[email protected]>> wrote: >> >> Hi JB, >> I rebuilt my code with the latest : >> kafka-0.1.0-incubating-20160501.070733-11.jar >> < >> https://repository.apache.org/content/groups/snapshots/org/apache/beam/kafka/0.1.0-incubating-SNAPSHOT/kafka-0.1.0-incubating-20160501.070733-11.jar >> > >> java-sdk-all-0.1.0-incubating-20160501.070453-25.jar >> < >> https://repository.apache.org/content/groups/snapshots/org/apache/beam/java-sdk-all/0.1.0-incubating-SNAPSHOT/java-sdk-all-0.1.0-incubating-20160501.070453-25.jar >> > >> >> >> Tried _without setting withMaxNumRecords()_: >> Throws java.lang.IllegalStateException: no evaluator registered for >> Read(UnboundedKafkaSource) >> at >> >> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898) >> at >> >> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221) >> at >> >> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217) >> >> _With setting ithMaxNumRecords(_), I see the thread is running, no >> exceptions like above, waiting for incoming Kafka data, but the >> method obtaining the data from processElement(ProcessContext ctx) >> never executes. >> Therefore, nothing goes into apply(TextIO.Write.to >> <http://textio.write.to/> >> <http://TextIO.Write.to <http://textio.write.to/> >> >("c:\\temp\\KafkaOut\\Kafkadata.txt")). >> >> I see Kafka Broker reports my laptop IP address as getting a >> connection to it, OK. >> Everything looks OK at the server side. >> Doesn't look like its my lucky day. >> I appreciate any help/feedback/suggetion. >> Cheers >> >> >> ------------------------------------------------------------------------ >> *From:* Jean-Baptiste Onofré <[email protected] <mailto:[email protected] >> >> >> *To:* [email protected] >> <mailto:[email protected]> >> *Sent:* Friday, April 29, 2016 10:36 PM >> *Subject:* Re: KafkaIO Usage & Sample Code >> >> >> As I said in my previous e-mail, until recently DirectPipelineRunner >> didn't support Unbounded. >> >> It's now fixed, so if you take a latest nightly build, or build >> master, >> it should work. >> >> As workaround, you can also limit the number of message consumed from >> Kafka (and so work with bounded). >> >> Regards >> JB >> >> On 04/29/2016 07:12 PM, amir bahmanyari wrote: >> > Hi colleagues, >> > I am moving this conversation to this users mailing list as per >> Max’s >> > suggestion. >> > Thanks Max. >> > Hi JB, >> > Hope all is great. >> > Is there a resolution to the exception I sent last night pls? >> > When would the sample code to use KafkaIO be released? >> > I really appreciate your valuable time. Below is the exception >> for your >> > reference. >> > This is how it gets used in my code: >> > >> > >> >> p.apply(KafkaIO.read().withBootstrapServers("kirk:9092").withTopics(topics)); >> > >> > Have a wonderful weekend. >> > Exception in thread "main" java.lang.IllegalStateException: no >> evaluator >> > registered for Read(UnboundedKafkaSource) >> > at >> > >> >> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898) >> > at >> > >> >> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221) >> > at >> > >> >> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217) >> > at >> > >> >> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217) >> > at >> > >> >> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104) >> > at >> > >> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261) >> > at >> > >> >> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860) >> > at >> > >> >> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572) >> > at >> > >> >> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106) >> > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182) >> > at >> > >> >> benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286) >> > Kind Regards, >> > Amir >> >> >> -- >> Jean-Baptiste Onofré >> [email protected] <mailto:[email protected]> >> http://blog.nanthrax.net <http://blog.nanthrax.net/> >> Talend - http://www.talend.com <http://www.talend.com/> >> >> >> >> >> >> -- >> Jean-Baptiste Onofré >> [email protected] >> http://blog.nanthrax.net >> Talend - http://www.talend.com >> >> >> >> >> >> >> >
