This may help trace it:Exception in thread "main" java.lang.IllegalStateException: no evaluator registered for Read(UnboundedKafkaSource) at org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898) at org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221) at org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217) at org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217) at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104) at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261) at org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860) at org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572) at org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182) at benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286)
From: Jean-Baptiste Onofré <j...@nanthrax.net> To: dev@beam.incubator.apache.org Sent: Thursday, April 28, 2016 10:08 PM Subject: Re: [DISCUSS] Beam IO &runners native IO I gonna take a look. The DirectPipelineRunner didn't support the unbounded collection (it has been fixed last night AFAIR). It could be related. Regards JB On 04/29/2016 07:00 AM, amir bahmanyari wrote: > Hi JB,I used the sample KafkaIO usage > below.p.apply(KafkaIO.read().withBootstrapServers("kafkahost:9092").withTopics(topics)); > p.run(); > > It threw the following at p.run():Exception in thread "main" > java.lang.IllegalStateException: no evaluator registered for > Read(UnboundedKafkaSource) at > org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898) > I am sure I am missing something.Would be great if I could see a sample > code.I appreciate it sir.Cheers > > From: Jean-Baptiste Onofré <j...@nanthrax.net> > To: dev@beam.incubator.apache.org > Sent: Thursday, April 28, 2016 5:30 AM > Subject: [DISCUSS] Beam IO &runners native IO > > Hi all, > > regarding the recent threads on the mailing list, I would like to start > a format discussion around the IO. > As we can expect the first contributions on this area (I already have > some work in progress around this ;)), I think it's a fair discussion to > have. > > Now, we have two kinds of IO: the one "generic" to Beam, the one "local" > to the runners. > > For example, let's take Kafka: we have the KafkaIO (in IO), and for > instance, we have the spark-streaming kafka connector (in Spark Runner). > > Right now, we have two approaches for the user: > 1. In the pipeline, we use KafkaIO from Beam: it's the preferred > approach for sure. However, the user may want to use the runner specific > IO for two reasons: > * Beam doesn't provide the IO yet (for instance, spark cassandra > connector is available whereas we don't have yet any CassandraIO (I'm > working on it anyway ;)) > * The runner native IO is optimized or contain more features that the > Beam native IO > 2. So, for the previous reasons, the user could want to use the native > runner IO. The drawback of this approach is that the pipeline will be > tight to a specific runner, which is completely against the Beam design. > > I wonder if it wouldn't make sense to add flag on the IO API (and > related on Runner API) like .useNative(). > > For instance, the user would be able to do: > > pipeline.apply(KafkaIO.read().withBootstrapServers("...").withTopics("...").useNative(true); > > then, if the runner has a "native" IO, it will use it, else, if > useNative(false) (the default), it won't use any runner native IO. > > The point there is for the configuration: assuming the Beam IO and the > runner IO can differ, it means that the "Beam IO" would have to populate > all runner specific IO configuration. > > Of course, it's always possible to use a PTransform to wrap the runner > native IO, but we are back on the same concern: the pipeline will be > couple to a specific runner. > > The purpose of the useNative() flag is to "automatically" inform the > runner to use a specific IO if it has one: the pipeline stays decoupled > from the runners. > > Thoughts ? > > Thanks > Regards > JB > -- Jean-Baptiste Onofré jbono...@apache.org http://blog.nanthrax.net Talend - http://www.talend.com