Bill,

Thanks for point it out! I think this is a real bug. Do you want to file PR
in github with the fix? You can find the instructions here:

https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes

Guozhang

On Fri, Nov 20, 2015 at 8:32 AM, Bill Bejeck <bbej...@gmail.com> wrote:

> Hi All,
>
> I'm starting to experiment with the lower-level Processor Client API found
> on the KIP-28 wiki.
>
> When starting the KafkaStream I get the following Exception:
>
> Exception in thread "main" java.util.NoSuchElementException: id: SINK
> at
>
> org.apache.kafka.streams.processor.internals.QuickUnion.root(QuickUnion.java:40)
> at
>
> org.apache.kafka.streams.processor.TopologyBuilder.makeNodeGroups(TopologyBuilder.java:387)
> at
>
> org.apache.kafka.streams.processor.TopologyBuilder.topicGroups(TopologyBuilder.java:339)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.<init>(StreamThread.java:139)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.<init>(StreamThread.java:120)
> at org.apache.kafka.streams.KafkaStreaming.<init>(KafkaStreaming.java:110)
> at bbejeck.ProcessorDriver.main(ProcessorDriver.java:35)
>
> The TopologyBuilder is being built like so:
> topologyBuilder.addSource("SOURCE", new StringDeserializer(), new
> StringDeserializer(), "src-topic")
>                 .addProcessor("PROCESS", new
> GenericProcessorClient(replaceVowels), "SOURCE")
>                 .addSink("SINK", "dest-topic", new StringSerializer(), new
> StringSerializer(), "PROCESS");
>
> Looks to me the cause of the error is that in  TopologyBuilder.addSink
> method the sink  is never connected with it's parent.
>
> When I added the following two lines to the addSink method, the Exception
> goes away.
>
>   nodeGrouper.add(name);
>   nodeGrouper.unite(name, parentNames);
>
> Is this a bug or am I doing something incorrect?
>
> Thanks,
> Bill
>



-- 
-- Guozhang

Reply via email to