Hi, I think the problem is that the DirectPipelineRunner seems to not support UnboundedSource (of which the UnboundedKafkaSource is an example). You can try running it with the InProcessPipelineRunner or the FlinkPipelineRunner.
Cheers, Aljoscha On Fri, 29 Apr 2016 at 19:12 amir bahmanyari <[email protected]> wrote: > Hi colleagues, > I am moving this conversation to this users mailing list as per Max’s > suggestion. > Thanks Max. > > Hi JB, > Hope all is great. > Is there a resolution to the exception I sent last night pls? > When would the sample code to use KafkaIO be released? > I really appreciate your valuable time. Below is the exception for your > reference. > This is how it gets used in my code: > > > p.apply(KafkaIO.read().withBootstrapServers("kirk:9092").withTopics(topics)); > > Have a wonderful weekend. > > Exception in thread "main" java.lang.IllegalStateException: no evaluator > registered for Read(UnboundedKafkaSource) > at > org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898) > at > org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221) > at > org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217) > at > org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217) > at > org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104) > at > org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261) > at > org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860) > at > org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572) > at > org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182) > at > benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286) > > > Kind Regards, > > Amir >
