Nikki Thean created KAFKA-7055:
----------------------------------
Summary: Kafka Streams Processor API allows you to add sinks and
processors without parent
Key: KAFKA-7055
URL: https://issues.apache.org/jira/browse/KAFKA-7055
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 1.0.0
Reporter: Nikki Thean
Assignee: Nikki Thean
The Kafka Streams Processor API allows you to define a Topology and connect
sources, processors, and sinks. From reading through the code, it seems that
you cannot forward a message to a downstream node unless it is explicitly
connected to the upstream node (from which you are forwarding the message) as a
child.
([example|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L117]]
where you forward using name of downstream node rather than child index)
However, I've been able to connect processors and sinks to the topology without
including parent names, i.e with empty vararg (using [this
method|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Topology.java#L423]).]
As any attempt to forward a message to those nodes will throw a
StreamsException, I suggest throwing an exception if a processor or sink is
added without at least one upstream node. There is a method in
`InternalTopologyBuilder` that allows you to connect processors by name after
you add them to the topology, but it is not part of the external Processor API.
In addition (or alternatively), I suggest making [the error message for when
users try to forward messages to a node that is not
connected|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L119]
more descriptive, like [this one for when a user attempts to access a state
store that is not connected to the
processor|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L75-L81].]
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)