Hi Amir, The problem is likely in using DataflowPipelineOptions.class -- this is specific to the Cloud Dataflow service and the DataflowPipelineRunner. Try using just "PipelineOptions".
Dan On Mon, May 2, 2016 at 8:26 AM, amir bahmanyari <[email protected]> wrote: > Thanks Dan. > I actually had tried it before but got compilation errors at setting the > InProcessPipelineRunner > in the PipelineOptions object.. > I appreciate it if you point me to a working sample code. > FYI, This is my implementation: > import com.google.cloud.dataflow.sdk.options.PipelineOptions; > import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; > DataflowPipelineOptions Myoptions = > PipelineOptionsFactory.create().as(DataflowPipelineOptions.class); > Myoptions.setRunner(InProcessPipelineRunner.class); > > I cannot set runner as InProcessPipelineRunner in the last line: > The method setRunner(Class<? extends PipelineRunner<?>>) in the type > PipelineOptions is not applicable for the arguments > (Class<InProcessPipelineRunner>). > Thanks for your help. > Amir- > > > ------------------------------ > *From:* Dan Halperin <[email protected]> > *To:* [email protected] > *Sent:* Monday, May 2, 2016 12:23 AM > *Subject:* Re: KafkaIO Usage & Sample Code > > On Mon, May 2, 2016 at 12:22 AM, Dan Halperin <[email protected]> wrote: > > Hi Amir, > > As Frances suggested, you can use the InProcessPipelineRunner instead of > the DirectPipelineRunner to execute your pipeline. (They're both in the > codebase, it's just that the Direct runner is the default. Use the --runner > command line option.) > > > Amending: it is relatively unlikely that the issues that we caught in > testing would affect you. So it should be safe for your use case to do this > -- and definitely safe to at least try it out! > > > > Dan > > On Mon, May 2, 2016 at 12:17 AM, Amir Bahmanyari <[email protected]> > wrote: > > Thanks gents > What are our options in the meanwhile? > Cheers > > Sent from my iPhone > > On May 2, 2016, at 12:00 AM, Dan Halperin <[email protected]> wrote: > > On Sun, May 1, 2016 at 10:29 PM, Jean-Baptiste Onofré <[email protected]> > wrote: > > Oh, thanks Frances. > > I mixed DirectPipelineRunner ("old" local runner), and > InProcessPipelineRunner ("new" local runner) ;) > > We should remove the DirectPipelineRunner to avoid confusion. WDYT ? > > > We would like to do this soon, but there are some snags. > > As a preparation step, Thomas swapped the default runner from Direct to > InProcess. (#178 <https://github.com/apache/incubator-beam/pull/178>) > > However, testing unfortunately exposed some issues with the InProcess > runner. (Actually, I should say "fortunately" because the tests caught it! > Yay!) So we had to roll it back. (#198 > <https://github.com/apache/incubator-beam/pull/198>) > > Once we improve the InProcess runner, we can re-do the default swap. After > the swap, once the tests keep passing for a few days, we do indeed intend > to delete the current Direct pipeline runner and replace it with the > current InProcess runner. > > Dan > > > > Regards > JB > > On 05/02/2016 03:12 AM, Frances Perry wrote: > > +Thomas, author of the InProcessPipelineRunner > > The DirectPipelineRunner doesn't yet support unbounded PCollections. You > can try using the InProcessPipelineRunner, which is the re-write of > local execution that provides support for unbounded PCollections and > better checking against the Beam Model. (We'll be renaming this to the > DirectPipelineRunner in the near future to avoid having both as soon as > the functionality of the InProcessPipelineRunner is complete.) > > On Sun, May 1, 2016 at 4:38 PM, amir bahmanyari <[email protected] > <mailto:[email protected]>> wrote: > > Hi JB, > I rebuilt my code with the latest : > kafka-0.1.0-incubating-20160501.070733-11.jar > < > https://repository.apache.org/content/groups/snapshots/org/apache/beam/kafka/0.1.0-incubating-SNAPSHOT/kafka-0.1.0-incubating-20160501.070733-11.jar > > > java-sdk-all-0.1.0-incubating-20160501.070453-25.jar > < > https://repository.apache.org/content/groups/snapshots/org/apache/beam/java-sdk-all/0.1.0-incubating-SNAPSHOT/java-sdk-all-0.1.0-incubating-20160501.070453-25.jar > > > > > Tried _without setting withMaxNumRecords()_: > Throws java.lang.IllegalStateException: no evaluator registered for > Read(UnboundedKafkaSource) > at > > org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898) > at > > org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221) > at > > org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217) > > _With setting ithMaxNumRecords(_), I see the thread is running, no > exceptions like above, waiting for incoming Kafka data, but the > method obtaining the data from processElement(ProcessContext ctx) > never executes. > Therefore, nothing goes into apply(TextIO.Write.to > <http://textio.write.to/> > <http://TextIO.Write.to <http://textio.write.to/> > >("c:\\temp\\KafkaOut\\Kafkadata.txt")). > > I see Kafka Broker reports my laptop IP address as getting a > connection to it, OK. > Everything looks OK at the server side. > Doesn't look like its my lucky day. > I appreciate any help/feedback/suggetion. > Cheers > > > ------------------------------------------------------------------------ > *From:* Jean-Baptiste Onofré <[email protected] <mailto:[email protected] > >> > *To:* [email protected] > <mailto:[email protected]> > *Sent:* Friday, April 29, 2016 10:36 PM > *Subject:* Re: KafkaIO Usage & Sample Code > > > As I said in my previous e-mail, until recently DirectPipelineRunner > didn't support Unbounded. > > It's now fixed, so if you take a latest nightly build, or build master, > it should work. > > As workaround, you can also limit the number of message consumed from > Kafka (and so work with bounded). > > Regards > JB > > On 04/29/2016 07:12 PM, amir bahmanyari wrote: > > Hi colleagues, > > I am moving this conversation to this users mailing list as per > Max’s > > suggestion. > > Thanks Max. > > Hi JB, > > Hope all is great. > > Is there a resolution to the exception I sent last night pls? > > When would the sample code to use KafkaIO be released? > > I really appreciate your valuable time. Below is the exception > for your > > reference. > > This is how it gets used in my code: > > > > > > p.apply(KafkaIO.read().withBootstrapServers("kirk:9092").withTopics(topics)); > > > > Have a wonderful weekend. > > Exception in thread "main" java.lang.IllegalStateException: no > evaluator > > registered for Read(UnboundedKafkaSource) > > at > > > > org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898) > > at > > > > org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221) > > at > > > > org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217) > > at > > > > org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217) > > at > > > > org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104) > > at > > > org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261) > > at > > > > org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860) > > at > > > > org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572) > > at > > > > org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106) > > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182) > > at > > > benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286) > > Kind Regards, > > Amir > > > -- > Jean-Baptiste Onofré > [email protected] <mailto:[email protected]> > http://blog.nanthrax.net <http://blog.nanthrax.net/> > Talend - http://www.talend.com <http://www.talend.com/> > > > > > > -- > Jean-Baptiste Onofré > [email protected] > http://blog.nanthrax.net > Talend - http://www.talend.com > > > > > > >
