[ 
https://issues.apache.org/jira/browse/KAFKA-7125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16530386#comment-16530386
 ] 

Jouni commented on KAFKA-7125:
------------------------------

Thanks Guozhang. I will try it when the PR is ready. Although I must first set 
up my development environment so that I can actually compile Kafka... and make 
sure that it works also with Spring framework. It's late night here where I 
live, and there's need for some sleep after several hours of coding and having 
a "few" beers later, so don't expect a quick answer.

I have also found a workaround for my case. It involves a few custom 
partitioners on the producer side so correlated data will end up on the right 
place. I used a globaltable just because my joins need a keyvaluemapper, the 
lookup key must be constructed from data in another incoming stream. Instead of 
using a globaltable, I can then just subscribe to a stream, do my 
not-streams-related work doing a dummy aggregation resulting to a table, and 
for the joins, just write my own implementations of 
transformersupplier/transformer and get the data from a state store. Oh I wish 
there would be stream-to-table joins with keyvaluemapper!

Haven't yet written the code for my workaround, so I'll test your PR first, 
when it's ready. Probably will end up implementing my workaround later anyway, 
should be faster, and performance will become very important in the future.

> Calling StreamsBuilderbuilder.build().describe() causes 
> java.util.NoSuchElementException: null
> ----------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7125
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7125
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.0
>            Reporter: Jouni
>            Assignee: Nikolay Izhikov
>            Priority: Minor
>              Labels: beginner, newbie
>
> After adding a a processor and a sink to topology after a globalstore and 
> then calling StreamBuilder.build().describe() again (for debugging purposes 
> and to check I got the topology right), had the following exception and 
> stacktrace:
> {{Caused by: java.util.NoSuchElementException: null}}
>  {{    at java.util.HashMap$HashIterator.nextNode(HashMap.java:1444) 
> ~[na:1.8.0_171]}}
>  {{    at java.util.HashMap$KeyIterator.next(HashMap.java:1466) 
> ~[na:1.8.0_171]}}
>  {{    at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describeGlobalStore(InternalTopologyBuilder.java:1323)
>  ~[kafka-streams-1.1.0.jar:na]}}
>  {{    at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.describe(InternalTopologyBuilder.java:1306)
>  ~[kafka-streams-1.1.0.jar:na]}}
>  {{    at org.apache.kafka.streams.Topology.describe(Topology.java:647) 
> ~[kafka-streams-1.1.0.jar:na]}}
> Snipped of code that caused this:
> {{        GlobalKTable<String, ServiceList> jsonRoutesToServices}}
>  {{                = builder.globalTable("routes-to-services",}}
>  {{                        Consumed.with(Serdes.String(), 
> jsonServiceListSerde),}}
>  {{                        Materialized.<String, ServiceList, 
> KeyValueStore<Bytes, byte[]>>as("routes-to-services"));}}
> {{        TopologyDescription td = builder.build().describe();}}
>  {{        String parent = null;}}
>  {{        // We get an iterator to a TreeSet sorted by processing order, and 
> just want the last one.}}
>  {{        for (TopologyDescription.GlobalStore store : td.globalStores()) {}}
>  {{            parent = store.processor().name();}}
>                  }
>  {{        TopologyDescription tdtd = builder.build().describe();}}
>  {{        builder.build().addProcessor("ROUTES-TO-SERVICES-FORWARDER", new 
> UnneededCruftSupplier(), parent);}}
>  {{        builder.build().addSink("FST-ROUTES-TO-SERVICES", 
> "fst-routes-to-services", Serdes.String().serializer(), 
> fstServiceListSerde.serializer(), "ROUTES-TO-SERVICES-FORWARDER");}}
>  {{        TopologyDescription tdtdtd = builder.build().describe();}}
> Note that the exception is thrown on the last line of the code snippet, 
> calling describe again before adding anything works fine.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to