This may help trace it:Exception in thread "main" 
java.lang.IllegalStateException: no evaluator registered for 
Read(UnboundedKafkaSource) at 
org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
 at 
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221) 
at 
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217) 
at 
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217) 
at 
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
 at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261) at 
org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860)
 at 
org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572)
 at 
org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106)
 at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182) at 
benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286)

      From: Jean-Baptiste Onofré <j...@nanthrax.net>
 To: dev@beam.incubator.apache.org 
 Sent: Thursday, April 28, 2016 10:08 PM
 Subject: Re: [DISCUSS] Beam IO &runners native IO
   
I gonna take a look. The DirectPipelineRunner didn't support the 
unbounded collection (it has been fixed last night AFAIR). It could be 
related.

Regards
JB

On 04/29/2016 07:00 AM, amir bahmanyari wrote:
> Hi JB,I used the sample KafkaIO usage 
> below.p.apply(KafkaIO.read().withBootstrapServers("kafkahost:9092").withTopics(topics));
> p.run();
>
> It threw the following at p.run():Exception in thread "main" 
> java.lang.IllegalStateException: no evaluator registered for 
> Read(UnboundedKafkaSource) at 
> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
>  I am sure I am missing something.Would be great if I could see a sample 
> code.I appreciate it sir.Cheers
>
>        From: Jean-Baptiste Onofré <j...@nanthrax.net>
>  To: dev@beam.incubator.apache.org
>  Sent: Thursday, April 28, 2016 5:30 AM
>  Subject: [DISCUSS] Beam IO &runners native IO
>
> Hi all,
>
> regarding the recent threads on the mailing list, I would like to start
> a format discussion around the IO.
> As we can expect the first contributions on this area (I already have
> some work in progress around this ;)), I think it's a fair discussion to
> have.
>
> Now, we have two kinds of IO: the one "generic" to Beam, the one "local"
> to the runners.
>
> For example, let's take Kafka: we have the KafkaIO (in IO), and for
> instance, we have the spark-streaming kafka connector (in Spark Runner).
>
> Right now, we have two approaches for the user:
> 1. In the pipeline, we use KafkaIO from Beam: it's the preferred
> approach for sure. However, the user may want to use the runner specific
> IO for two reasons:
>      * Beam doesn't provide the IO yet (for instance, spark cassandra
> connector is available whereas we don't have yet any CassandraIO (I'm
> working on it anyway ;))
>      * The runner native IO is optimized or contain more features that the
> Beam native IO
> 2. So, for the previous reasons, the user could want to use the native
> runner IO. The drawback of this approach is that the pipeline will be
> tight to a specific runner, which is completely against the Beam design.
>
> I wonder if it wouldn't make sense to add flag on the IO API (and
> related on Runner API) like .useNative().
>
> For instance, the user would be able to do:
>
> pipeline.apply(KafkaIO.read().withBootstrapServers("...").withTopics("...").useNative(true);
>
> then, if the runner has a "native" IO, it will use it, else, if
> useNative(false) (the default), it won't use any runner native IO.
>
> The point there is for the configuration: assuming the Beam IO and the
> runner IO can differ, it means that the "Beam IO" would have to populate
> all runner specific IO configuration.
>
> Of course, it's always possible to use a PTransform to wrap the runner
> native IO, but we are back on the same concern: the pipeline will be
> couple to a specific runner.
>
> The purpose of the useNative() flag is to "automatically" inform the
> runner to use a specific IO if it has one: the pipeline stays decoupled
> from the runners.
>
> Thoughts ?
>
> Thanks
> Regards
> JB
>

-- 
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


  

Reply via email to