Hi Amir,

The problem is likely in using DataflowPipelineOptions.class -- this is
specific to the Cloud Dataflow service and the DataflowPipelineRunner. Try
using just "PipelineOptions".

Dan

On Mon, May 2, 2016 at 8:26 AM, amir bahmanyari <[email protected]> wrote:

> Thanks Dan.
> I actually had tried it before but got compilation errors at setting the 
> InProcessPipelineRunner
>  in the PipelineOptions object..
> I appreciate it if you point me to a working sample code.
> FYI, This is my implementation:
> import com.google.cloud.dataflow.sdk.options.PipelineOptions;
> import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
> DataflowPipelineOptions Myoptions =
> PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
> Myoptions.setRunner(InProcessPipelineRunner.class);
>
> I cannot set runner as InProcessPipelineRunner in the last line:
> The method setRunner(Class<? extends PipelineRunner<?>>) in the type
> PipelineOptions is not applicable for the arguments
> (Class<InProcessPipelineRunner>).
> Thanks for your help.
> Amir-
>
>
> ------------------------------
> *From:* Dan Halperin <[email protected]>
> *To:* [email protected]
> *Sent:* Monday, May 2, 2016 12:23 AM
> *Subject:* Re: KafkaIO Usage & Sample Code
>
> On Mon, May 2, 2016 at 12:22 AM, Dan Halperin <[email protected]> wrote:
>
> Hi Amir,
>
> As Frances suggested, you can use the InProcessPipelineRunner instead of
> the DirectPipelineRunner to execute your pipeline. (They're both in the
> codebase, it's just that the Direct runner is the default. Use the --runner
> command line option.)
>
>
> Amending: it is relatively unlikely that the issues that we caught in
> testing would affect you. So it should be safe for your use case to do this
> -- and definitely safe to at least try it out!
>
>
>
> Dan
>
> On Mon, May 2, 2016 at 12:17 AM, Amir Bahmanyari <[email protected]>
> wrote:
>
> Thanks gents
> What are our options in the meanwhile?
> Cheers
>
> Sent from my iPhone
>
> On May 2, 2016, at 12:00 AM, Dan Halperin <[email protected]> wrote:
>
> On Sun, May 1, 2016 at 10:29 PM, Jean-Baptiste Onofré <[email protected]>
> wrote:
>
> Oh, thanks Frances.
>
> I mixed DirectPipelineRunner ("old" local runner), and
> InProcessPipelineRunner ("new" local runner) ;)
>
> We should remove the DirectPipelineRunner to avoid confusion. WDYT ?
>
>
> We would like to do this soon, but there are some snags.
>
> As a preparation step, Thomas swapped the default runner from Direct to
> InProcess. (#178 <https://github.com/apache/incubator-beam/pull/178>)
>
> However, testing unfortunately exposed some issues with the InProcess
> runner. (Actually, I should say "fortunately" because the tests caught it!
> Yay!) So we had to roll it back. (#198
> <https://github.com/apache/incubator-beam/pull/198>)
>
> Once we improve the InProcess runner, we can re-do the default swap. After
> the swap, once the tests keep passing for a few days, we do indeed intend
> to delete the current Direct pipeline runner and replace it with the
> current InProcess runner.
>
> Dan
>
>
>
> Regards
> JB
>
> On 05/02/2016 03:12 AM, Frances Perry wrote:
>
> +Thomas, author of the InProcessPipelineRunner
>
> The DirectPipelineRunner doesn't yet support unbounded PCollections. You
> can try using the InProcessPipelineRunner, which is the re-write of
> local execution that provides support for unbounded PCollections and
> better checking against the Beam Model. (We'll be renaming this to the
> DirectPipelineRunner in the near future to avoid having both as soon as
> the functionality of the InProcessPipelineRunner is complete.)
>
> On Sun, May 1, 2016 at 4:38 PM, amir bahmanyari <[email protected]
> <mailto:[email protected]>> wrote:
>
>     Hi JB,
>     I rebuilt my code with the latest :
>     kafka-0.1.0-incubating-20160501.070733-11.jar
>     <
> https://repository.apache.org/content/groups/snapshots/org/apache/beam/kafka/0.1.0-incubating-SNAPSHOT/kafka-0.1.0-incubating-20160501.070733-11.jar
> >
>     java-sdk-all-0.1.0-incubating-20160501.070453-25.jar
>     <
> https://repository.apache.org/content/groups/snapshots/org/apache/beam/java-sdk-all/0.1.0-incubating-SNAPSHOT/java-sdk-all-0.1.0-incubating-20160501.070453-25.jar
> >
>
>
>     Tried _without setting withMaxNumRecords()_:
>     Throws java.lang.IllegalStateException: no evaluator registered for
>     Read(UnboundedKafkaSource)
>     at
>
> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
>     at
>
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
>     at
>
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>
>     _With setting ithMaxNumRecords(_), I see the thread is running, no
>     exceptions like above, waiting for incoming Kafka data, but the
>     method obtaining the data from processElement(ProcessContext ctx)
>     never executes.
>     Therefore, nothing goes into apply(TextIO.Write.to
> <http://textio.write.to/>
>     <http://TextIO.Write.to <http://textio.write.to/>
> >("c:\\temp\\KafkaOut\\Kafkadata.txt")).
>
>     I see Kafka Broker reports my laptop IP address as getting a
>     connection to it, OK.
>     Everything looks OK at the server side.
>     Doesn't look like its my lucky day.
>     I appreciate any help/feedback/suggetion.
>     Cheers
>
>
> ------------------------------------------------------------------------
>     *From:* Jean-Baptiste Onofré <[email protected] <mailto:[email protected]
> >>
>     *To:* [email protected]
>     <mailto:[email protected]>
>     *Sent:* Friday, April 29, 2016 10:36 PM
>     *Subject:* Re: KafkaIO Usage & Sample Code
>
>
>     As I said in my previous e-mail, until recently DirectPipelineRunner
>     didn't support Unbounded.
>
>     It's now fixed, so if you take a latest nightly build, or build master,
>     it should work.
>
>     As workaround, you can also limit the number of message consumed from
>     Kafka (and so work with bounded).
>
>     Regards
>     JB
>
>     On 04/29/2016 07:12 PM, amir bahmanyari wrote:
>      > Hi colleagues,
>      > I am moving this conversation to this users mailing list as per
> Max’s
>      > suggestion.
>      > Thanks Max.
>      > Hi JB,
>      > Hope all is great.
>      > Is there a resolution to the exception I sent last night pls?
>      > When would the sample code to use KafkaIO be released?
>      > I really appreciate your valuable time. Below is the exception
>     for your
>      > reference.
>      > This is how it gets used in my code:
>      >
>      >
>
> p.apply(KafkaIO.read().withBootstrapServers("kirk:9092").withTopics(topics));
>      >
>      > Have a wonderful weekend.
>      > Exception in thread "main" java.lang.IllegalStateException: no
>     evaluator
>      > registered for Read(UnboundedKafkaSource)
>      >        at
>      >
>
> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
>      >        at
>      >
>
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
>      >        at
>      >
>
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>      >        at
>      >
>
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>      >        at
>      >
>
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
>      >        at
>      >
> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261)
>      >        at
>      >
>
> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860)
>      >        at
>      >
>
> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572)
>      >        at
>      >
>
> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106)
>      >        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
>      >        at
>      >
>     benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286)
>      > Kind Regards,
>      > Amir
>
>
>     --
>     Jean-Baptiste Onofré
>     [email protected] <mailto:[email protected]>
>     http://blog.nanthrax.net <http://blog.nanthrax.net/>
>     Talend - http://www.talend.com <http://www.talend.com/>
>
>
>
>
>
> --
> Jean-Baptiste Onofré
> [email protected]
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
>
>
>
>
>
>

Reply via email to