[ 
https://issues.apache.org/jira/browse/KAFKA-7125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531130#comment-16531130
 ] 

Jouni commented on KAFKA-7125:
------------------------------

Thanks for your efforts. The exception has gone. Another thing is that my 
approach has already stopped working and my processors process method isn't 
called ... It was in some older version, can't remember exactly which one. So 
the Change<> probably isn't forwarded downstream anymore.

After reading your explanation I quite agree that my approach isn't the wisest 
one, especially adding a sink. But I have one real use case for adding a 
processor. I'm feeding the change event to an embedded per-instance ActiveMQ 
broker that serves browser clients over STOMP/WebSockets, and would like to 
somehow get notified for the occasional incoming change to the globalstore. So 
consider this bug report as feature request to be able to catch that event in 
some way instead.

> Calling StreamsBuilderbuilder.build().describe() causes 
> java.util.NoSuchElementException: null
> ----------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7125
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7125
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.0
>            Reporter: Jouni
>            Assignee: Nikolay Izhikov
>            Priority: Minor
>              Labels: beginner, newbie
>
> After adding a a processor and a sink to topology after a globalstore and 
> then calling StreamBuilder.build().describe() again (for debugging purposes 
> and to check I got the topology right), had the following exception and 
> stacktrace:
> {{Caused by: java.util.NoSuchElementException: null}}
>  {{    at java.util.HashMap$HashIterator.nextNode(HashMap.java:1444) 
> ~[na:1.8.0_171]}}
>  {{    at java.util.HashMap$KeyIterator.next(HashMap.java:1466) 
> ~[na:1.8.0_171]}}
>  {{    at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describeGlobalStore(InternalTopologyBuilder.java:1323)
>  ~[kafka-streams-1.1.0.jar:na]}}
>  {{    at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describe(InternalTopologyBuilder.java:1306)
>  ~[kafka-streams-1.1.0.jar:na]}}
>  {{    at org.apache.kafka.streams.Topology.describe(Topology.java:647) 
> ~[kafka-streams-1.1.0.jar:na]}}
> Snipped of code that caused this:
> {{        GlobalKTable<String, ServiceList> jsonRoutesToServices}}
>  {{                = builder.globalTable("routes-to-services",}}
>  {{                        Consumed.with(Serdes.String(), 
> jsonServiceListSerde),}}
>  {{                        Materialized.<String, ServiceList, 
> KeyValueStore<Bytes, byte[]>>as("routes-to-services"));}}
> {{        TopologyDescription td = builder.build().describe();}}
>  {{        String parent = null;}}
>  {{        // We get an iterator to a TreeSet sorted by processing order, and 
> just want the last one.}}
>  {{        for (TopologyDescription.GlobalStore store : td.globalStores()) {}}
>  {{            parent = store.processor().name();}}
>                  }
>  {{        TopologyDescription tdtd = builder.build().describe();}}
>  {{        builder.build().addProcessor("ROUTES-TO-SERVICES-FORWARDER", new 
> UnneededCruftSupplier(), parent);}}
>  {{        builder.build().addSink("FST-ROUTES-TO-SERVICES", 
> "fst-routes-to-services", Serdes.String().serializer(), 
> fstServiceListSerde.serializer(), "ROUTES-TO-SERVICES-FORWARDER");}}
>  {{        TopologyDescription tdtdtd = builder.build().describe();}}
> Note that the exception is thrown on the last line of the code snippet, 
> calling describe again before adding anything works fine.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to