[
https://issues.apache.org/jira/browse/KAFKA-2872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15020507#comment-15020507
]
ASF GitHub Bot commented on KAFKA-2872:
---------------------------------------
GitHub user bbejeck opened a pull request:
https://github.com/apache/kafka/pull/572
KAFKA-2872 Fixed addSink method connecting sink with parent source(s)…
Starting a KafkaStream was getting an error due to the fact that the
TopologyBuilder.addSink method was not connecting the sink with it parent(s)
processor/sources. Just needed to wire up the sink with it parent(s) in
TopologyBuilder.addSink .
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/bbejeck/kafka
KAFKA-2872_kafka_stream_sink_not_connected_to_parent
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/kafka/pull/572.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #572
----
commit 514796557b904ed77441cc61393b7180eb046e86
Author: bbejeck <[email protected]>
Date: 2015-11-21T04:12:18Z
KAFKA-2872 Fixed addSink method connecting sink with parent source(s) or
parent processor(s)
----
> Error starting KafkaStream caused by sink not being connected to parent
> source/processor nodes
> ----------------------------------------------------------------------------------------------
>
> Key: KAFKA-2872
> URL: https://issues.apache.org/jira/browse/KAFKA-2872
> Project: Kafka
> Issue Type: Bug
> Components: kafka streams
> Affects Versions: 0.9.0.0
> Reporter: Bill Bejeck
> Assignee: Bill Bejeck
>
> When starting the KafkaStream I get the following Exception:
> Exception in thread "main" java.util.NoSuchElementException: id: SINK
> at
> org.apache.kafka.streams.processor.internals.QuickUnion.root(QuickUnion.java:40)
> at
> org.apache.kafka.streams.processor.TopologyBuilder.makeNodeGroups(TopologyBuilder.java:387)
> at
> org.apache.kafka.streams.processor.TopologyBuilder.topicGroups(TopologyBuilder.java:339)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.<init>(StreamThread.java:139)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.<init>(StreamThread.java:120)
> at
> org.apache.kafka.streams.KafkaStreaming.<init>(KafkaStreaming.java:110)
> at bbejeck.ProcessorDriver.main(ProcessorDriver.java:35)
> The TopologyBuilder is being built like so:
> topologyBuilder.addSource("SOURCE", new StringDeserializer(), new
> StringDeserializer(), "src-topic")
> .addProcessor("PROCESS", new
> GenericProcessorClient(replaceVowels), "SOURCE")
> .addSink("SINK", "dest-topic", new StringSerializer(), new
> StringSerializer(), "PROCESS");
> Looks to me the cause of the error is that in TopologyBuilder.addSink method
> the sink is never connected with it's parent.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)