[jira] [Commented] (KAFKA-13963) Topology Description ignores context.forward

2022-06-14 Thread Tomasz Kaszuba (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17553976#comment-17553976
 ] 

Tomasz Kaszuba commented on KAFKA-13963:


Created PR for the docs: https://github.com/apache/kafka/pull/12293/files

> Topology Description ignores context.forward
> 
>
> Key: KAFKA-13963
> URL: https://issues.apache.org/jira/browse/KAFKA-13963
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.2
>Reporter: Tomasz Kaszuba
>Priority: Minor
>
> I have a simple topology:
> {code:java}
>       val topology = new Topology
>       topology
>         .addSource("source", Serdes.stringSerde.deserializer, 
> Serdes.stringSerde.deserializer, inputTopic)
>         .addProcessor(
>           "process",
>           new ProcessorSupplier[String, String] {
>             override def get(): Processor[String, String] =
>               new RecordCollectorProcessor()
>           },
>           "source"
>         ) {code}
> And a simple processor that uses context.forward to forward messages:
> {code:java}
>   private class ContextForwardProcessor extends AbstractProcessor[String, 
> String]() {    override def process(key: String, value: String): Unit =
>       context().forward("key", "value", To.child("output"))    override def 
> close(): Unit = ()
>   }  {code}
> when I call topology.describe() I receive this:
> {noformat}
> Topologies:
>    Sub-topology: 0
>     Source: source (topics: [input])
>       --> process
>     Processor: process (stores: [])
>       --> none
>       <-- source {noformat}
> Ignoring the fact that this will not run since it will throw a runtime 
> exception why is the To.child ignored?
> Taking it one point further if I add multiple sinks to the topology like so:
> {code:java}
> val topology = new Topology
>       topology
>         .addSource("source", Serdes.stringSerde.deserializer, 
> Serdes.stringSerde.deserializer, inputTopic)
>         .addProcessor(
>           "process",
>           new ProcessorSupplier[String, String] {
>             override def get(): Processor[String, String] =
>               new ContextForwardProcessor()
>           },
>           "source"
>         )
>         .addSink("sink", "output1", Serdes.stringSerde.serializer(), 
> Serdes.stringSerde.serializer(), "process")
>         .addSink("sink2", "output2", Serdes.stringSerde.serializer(), 
> Serdes.stringSerde.serializer(), "process")  {code}
> but have the processor only output to "output1" it is in no way reflected in 
> the described topology graph.
> I assume this is by design since it's a lot more work to interpret what the 
> context.forward is doing but when I tried to look for this information in the 
> java doc I couldn't find it.
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13963) Topology Description ignores context.forward

2022-06-07 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17551333#comment-17551333
 ] 

Matthias J. Sax commented on KAFKA-13963:
-

{quote}Is it worth updating the java doc to mention this?
{quote}
Updating docs can never hurt :) – are you interested in doing a PR?
{quote}if you use the internal RecordCollector, which I feel should be better 
hidden from the streams api users
{quote}
Yes, you should NEVER use internal stuff... Not sure how we could "better hide" 
it though? Seems not to be possible as long as we are using Java 8...
{quote}I can open up a separate bug for that if it makes sense.
{quote}
Don't think it's a bug? It (unfortunately) how Java works.

> Topology Description ignores context.forward
> 
>
> Key: KAFKA-13963
> URL: https://issues.apache.org/jira/browse/KAFKA-13963
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.2
>Reporter: Tomasz Kaszuba
>Priority: Minor
>
> I have a simple topology:
> {code:java}
>       val topology = new Topology
>       topology
>         .addSource("source", Serdes.stringSerde.deserializer, 
> Serdes.stringSerde.deserializer, inputTopic)
>         .addProcessor(
>           "process",
>           new ProcessorSupplier[String, String] {
>             override def get(): Processor[String, String] =
>               new RecordCollectorProcessor()
>           },
>           "source"
>         ) {code}
> And a simple processor that uses context.forward to forward messages:
> {code:java}
>   private class ContextForwardProcessor extends AbstractProcessor[String, 
> String]() {    override def process(key: String, value: String): Unit =
>       context().forward("key", "value", To.child("output"))    override def 
> close(): Unit = ()
>   }  {code}
> when I call topology.describe() I receive this:
> {noformat}
> Topologies:
>    Sub-topology: 0
>     Source: source (topics: [input])
>       --> process
>     Processor: process (stores: [])
>       --> none
>       <-- source {noformat}
> Ignoring the fact that this will not run since it will throw a runtime 
> exception why is the To.child ignored?
> Taking it one point further if I add multiple sinks to the topology like so:
> {code:java}
> val topology = new Topology
>       topology
>         .addSource("source", Serdes.stringSerde.deserializer, 
> Serdes.stringSerde.deserializer, inputTopic)
>         .addProcessor(
>           "process",
>           new ProcessorSupplier[String, String] {
>             override def get(): Processor[String, String] =
>               new ContextForwardProcessor()
>           },
>           "source"
>         )
>         .addSink("sink", "output1", Serdes.stringSerde.serializer(), 
> Serdes.stringSerde.serializer(), "process")
>         .addSink("sink2", "output2", Serdes.stringSerde.serializer(), 
> Serdes.stringSerde.serializer(), "process")  {code}
> but have the processor only output to "output1" it is in no way reflected in 
> the described topology graph.
> I assume this is by design since it's a lot more work to interpret what the 
> context.forward is doing but when I tried to look for this information in the 
> java doc I couldn't find it.
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13963) Topology Description ignores context.forward

2022-06-07 Thread Tomasz Kaszuba (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17551289#comment-17551289
 ] 

Tomasz Kaszuba commented on KAFKA-13963:


Ok, this is what I thought. Is it worth updating the java doc to mention this? 
The developers I work with were surprised that context.forward is not covered. 
We really heavily on the generated topology graphs for impact analysis.

Btw, I think you can get around the context forward exception and the need for 
registering sinks if you use the internal RecordCollector, which I feel should 
be better hidden from the streams api users since it's a class cast exception 
waiting to happen. I can open up a separate bug for that if it makes sense.
{code:java}
collector = context.asInstanceOf[RecordCollector.Supplier].recordCollector 
{code}

> Topology Description ignores context.forward
> 
>
> Key: KAFKA-13963
> URL: https://issues.apache.org/jira/browse/KAFKA-13963
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.2
>Reporter: Tomasz Kaszuba
>Priority: Minor
>
> I have a simple topology:
> {code:java}
>       val topology = new Topology
>       topology
>         .addSource("source", Serdes.stringSerde.deserializer, 
> Serdes.stringSerde.deserializer, inputTopic)
>         .addProcessor(
>           "process",
>           new ProcessorSupplier[String, String] {
>             override def get(): Processor[String, String] =
>               new RecordCollectorProcessor()
>           },
>           "source"
>         ) {code}
> And a simple processor that uses context.forward to forward messages:
> {code:java}
>   private class ContextForwardProcessor extends AbstractProcessor[String, 
> String]() {    override def process(key: String, value: String): Unit =
>       context().forward("key", "value", To.child("output"))    override def 
> close(): Unit = ()
>   }  {code}
> when I call topology.describe() I receive this:
> {noformat}
> Topologies:
>    Sub-topology: 0
>     Source: source (topics: [input])
>       --> process
>     Processor: process (stores: [])
>       --> none
>       <-- source {noformat}
> Ignoring the fact that this will not run since it will throw a runtime 
> exception why is the To.child ignored?
> Taking it one point further if I add multiple sinks to the topology like so:
> {code:java}
> val topology = new Topology
>       topology
>         .addSource("source", Serdes.stringSerde.deserializer, 
> Serdes.stringSerde.deserializer, inputTopic)
>         .addProcessor(
>           "process",
>           new ProcessorSupplier[String, String] {
>             override def get(): Processor[String, String] =
>               new ContextForwardProcessor()
>           },
>           "source"
>         )
>         .addSink("sink", "output1", Serdes.stringSerde.serializer(), 
> Serdes.stringSerde.serializer(), "process")
>         .addSink("sink2", "output2", Serdes.stringSerde.serializer(), 
> Serdes.stringSerde.serializer(), "process")  {code}
> but have the processor only output to "output1" it is in no way reflected in 
> the described topology graph.
> I assume this is by design since it's a lot more work to interpret what the 
> context.forward is doing but when I tried to look for this information in the 
> java doc I couldn't find it.
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13963) Topology Description ignores context.forward

2022-06-07 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17551271#comment-17551271
 ] 

Matthias J. Sax commented on KAFKA-13963:
-

TopologyDescription only describes the structure of you graph of operators. In 
your first example, you only added two nodes to the graph ("source" and 
"process") and there is no node "output", and thus it's not contained in the 
`TopologyDescription`.

It's not really possible to take the business logic (ie, what `forward()` is 
doing) into account – at least I have not idea how this could be done with 
reasonable effort.

It's for sure not a bug. We should either close this ticket and change it into 
a feature request.

> Topology Description ignores context.forward
> 
>
> Key: KAFKA-13963
> URL: https://issues.apache.org/jira/browse/KAFKA-13963
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.2
>Reporter: Tomasz Kaszuba
>Priority: Minor
>
> I have a simple topology:
> {code:java}
>       val topology = new Topology
>       topology
>         .addSource("source", Serdes.stringSerde.deserializer, 
> Serdes.stringSerde.deserializer, inputTopic)
>         .addProcessor(
>           "process",
>           new ProcessorSupplier[String, String] {
>             override def get(): Processor[String, String] =
>               new RecordCollectorProcessor()
>           },
>           "source"
>         ) {code}
> And a simple processor that uses context.forward to forward messages:
> {code:java}
>   private class ContextForwardProcessor extends AbstractProcessor[String, 
> String]() {    override def process(key: String, value: String): Unit =
>       context().forward("key", "value", To.child("output"))    override def 
> close(): Unit = ()
>   }  {code}
> when I call topology.describe() I receive this:
> {noformat}
> Topologies:
>    Sub-topology: 0
>     Source: source (topics: [input])
>       --> process
>     Processor: process (stores: [])
>       --> none
>       <-- source {noformat}
> Ignoring the fact that this will not run since it will throw a runtime 
> exception why is the To.child ignored?
> Taking it one point further if I add multiple sinks to the topology like so:
> {code:java}
> val topology = new Topology
>       topology
>         .addSource("source", Serdes.stringSerde.deserializer, 
> Serdes.stringSerde.deserializer, inputTopic)
>         .addProcessor(
>           "process",
>           new ProcessorSupplier[String, String] {
>             override def get(): Processor[String, String] =
>               new ContextForwardProcessor()
>           },
>           "source"
>         )
>         .addSink("sink", "output1", Serdes.stringSerde.serializer(), 
> Serdes.stringSerde.serializer(), "process")
>         .addSink("sink2", "output2", Serdes.stringSerde.serializer(), 
> Serdes.stringSerde.serializer(), "process")  {code}
> but have the processor only output to "output1" it is in no way reflected in 
> the described topology graph.
> I assume this is by design since it's a lot more work to interpret what the 
> context.forward is doing but when I tried to look for this information in the 
> java doc I couldn't find it.
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)