Bill, Thanks for point it out! I think this is a real bug. Do you want to file PR in github with the fix? You can find the instructions here:
https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes Guozhang On Fri, Nov 20, 2015 at 8:32 AM, Bill Bejeck <bbej...@gmail.com> wrote: > Hi All, > > I'm starting to experiment with the lower-level Processor Client API found > on the KIP-28 wiki. > > When starting the KafkaStream I get the following Exception: > > Exception in thread "main" java.util.NoSuchElementException: id: SINK > at > > org.apache.kafka.streams.processor.internals.QuickUnion.root(QuickUnion.java:40) > at > > org.apache.kafka.streams.processor.TopologyBuilder.makeNodeGroups(TopologyBuilder.java:387) > at > > org.apache.kafka.streams.processor.TopologyBuilder.topicGroups(TopologyBuilder.java:339) > at > > org.apache.kafka.streams.processor.internals.StreamThread.<init>(StreamThread.java:139) > at > > org.apache.kafka.streams.processor.internals.StreamThread.<init>(StreamThread.java:120) > at org.apache.kafka.streams.KafkaStreaming.<init>(KafkaStreaming.java:110) > at bbejeck.ProcessorDriver.main(ProcessorDriver.java:35) > > The TopologyBuilder is being built like so: > topologyBuilder.addSource("SOURCE", new StringDeserializer(), new > StringDeserializer(), "src-topic") > .addProcessor("PROCESS", new > GenericProcessorClient(replaceVowels), "SOURCE") > .addSink("SINK", "dest-topic", new StringSerializer(), new > StringSerializer(), "PROCESS"); > > Looks to me the cause of the error is that in TopologyBuilder.addSink > method the sink is never connected with it's parent. > > When I added the following two lines to the addSink method, the Exception > goes away. > > nodeGrouper.add(name); > nodeGrouper.unite(name, parentNames); > > Is this a bug or am I doing something incorrect? > > Thanks, > Bill > -- -- Guozhang