Re: Kafka Streams KTable-KTable Join Error
Thanks for validating! Guozhang On Fri, Jun 17, 2016 at 7:39 PM, Tim Rennerwrote: > Hi Guozhang, > > Apologies for the delay - had to convert to the new groupBy. The fix got > rid of the error. > > Thanks! > > Tim > -- -- Guozhang
Re: Kafka Streams KTable-KTable Join Error
Hi Guozhang, Apologies for the delay - had to convert to the new groupBy. The fix got rid of the error. Thanks! Tim
Re: Kafka Streams KTable-KTable Join Error
Hello Tim, I think I found the issue, could you apply the following patch and retry your app? https://github.com/apache/kafka/pull/1520 Guozhang On Thu, Jun 16, 2016 at 11:11 PM, Guozhang Wangwrote: > Hello Tim, > > By looking through the source code I suspect it is a bug in Kafka Stream's > KStreamWindowReduce > implementation. I'll do further investigation tomorrow and possibly file a > JIRA with a patch. > > > Guozhang > > > On Thu, Jun 16, 2016 at 4:12 PM, Tim Renner > wrote: > >> Hi all, >> >> I'm trying to do a KTable-KTable join to compute an average within a >> tumbling window. >> Here's the KStreams code (I've put a fully working example in a gist: >> https://gist.github.com/timothyrenner/a99c86b2d6ed2c22c8703e8c7760af3a) >> KStreamBuilder builder = new KStreamBuilder(); >> >> KStream longs = builder.stream( >> Serdes.String(), Serdes.Long(), "longs"); >> >> KTable longCounts = >> longs.countByKey(TimeWindows.of("longCounts", 1L), >> Serdes.String()); >> >> >> KTable longSums = >> longs.reduceByKey((v1, v2) -> v1 + v2, >> TimeWindows.of("longSums", 1L), >> Serdes.String(), >> Serdes.Long()); >> >> KTable longAvgs = >> longSums.join(longCounts, >> (sum, count) -> >> sum.doubleValue()/count.doubleValue()); >> >> longAvgs.toStream((wk, v) -> wk.key()) >> .to(Serdes.String(), >> Serdes.Double(), >> "long-avgs"); >> >> KafkaStreams streams = new KafkaStreams(builder, config); >> streams.start(); >> >> When I run this, I get the following exception: >> >> java.util.NoSuchElementException >> at >> >> org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowStoreIterator.next(RocksDBWindowStore.java:95) >> at >> >> org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowStoreIterator.next(RocksDBWindowStore.java:64) >> at >> >> org.apache.kafka.streams.state.internals.MeteredWindowStore$MeteredWindowStoreIterator.next(MeteredWindowStore.java:136) >> at >> >> org.apache.kafka.streams.state.internals.MeteredWindowStore$MeteredWindowStoreIterator.next(MeteredWindowStore.java:117) >> at >> >> org.apache.kafka.streams.kstream.internals.KStreamWindowReduce$KStreamAggregateValueGetter.get(KStreamWindowReduce.java:166) >> at >> >> org.apache.kafka.streams.kstream.internals.KStreamWindowReduce$KStreamAggregateValueGetter.get(KStreamWindowReduce.java:147) >> at >> >> org.apache.kafka.streams.kstream.internals.KTableKTableJoin$KTableKTableJoinProcessor.process(KTableKTableJoin.java:77) >> at >> >> org.apache.kafka.streams.kstream.internals.KTableKTableJoin$KTableKTableJoinProcessor.process(KTableKTableJoin.java:48) >> at >> >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68) >> at >> >> org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338) >> at >> >> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187) >> at >> >> org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:136) >> at >> >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68) >> at >> >> org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338) >> at >> >> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187) >> at >> >> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:64) >> at >> >> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:174) >> at >> >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:320) >> at >> >> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) >> >> Looks like the join is throwing the exception. Any ideas? >> >> Thanks, >> Tim >> > > > > -- > -- Guozhang > -- -- Guozhang
Re: Kafka Streams KTable-KTable Join Error
Hello Tim, By looking through the source code I suspect it is a bug in Kafka Stream's KStreamWindowReduce implementation. I'll do further investigation tomorrow and possibly file a JIRA with a patch. Guozhang On Thu, Jun 16, 2016 at 4:12 PM, Tim Rennerwrote: > Hi all, > > I'm trying to do a KTable-KTable join to compute an average within a > tumbling window. > Here's the KStreams code (I've put a fully working example in a gist: > https://gist.github.com/timothyrenner/a99c86b2d6ed2c22c8703e8c7760af3a) > KStreamBuilder builder = new KStreamBuilder(); > > KStream longs = builder.stream( > Serdes.String(), Serdes.Long(), "longs"); > > KTable longCounts = > longs.countByKey(TimeWindows.of("longCounts", 1L), > Serdes.String()); > > > KTable longSums = > longs.reduceByKey((v1, v2) -> v1 + v2, > TimeWindows.of("longSums", 1L), > Serdes.String(), > Serdes.Long()); > > KTable longAvgs = > longSums.join(longCounts, > (sum, count) -> > sum.doubleValue()/count.doubleValue()); > > longAvgs.toStream((wk, v) -> wk.key()) > .to(Serdes.String(), > Serdes.Double(), > "long-avgs"); > > KafkaStreams streams = new KafkaStreams(builder, config); > streams.start(); > > When I run this, I get the following exception: > > java.util.NoSuchElementException > at > > org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowStoreIterator.next(RocksDBWindowStore.java:95) > at > > org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowStoreIterator.next(RocksDBWindowStore.java:64) > at > > org.apache.kafka.streams.state.internals.MeteredWindowStore$MeteredWindowStoreIterator.next(MeteredWindowStore.java:136) > at > > org.apache.kafka.streams.state.internals.MeteredWindowStore$MeteredWindowStoreIterator.next(MeteredWindowStore.java:117) > at > > org.apache.kafka.streams.kstream.internals.KStreamWindowReduce$KStreamAggregateValueGetter.get(KStreamWindowReduce.java:166) > at > > org.apache.kafka.streams.kstream.internals.KStreamWindowReduce$KStreamAggregateValueGetter.get(KStreamWindowReduce.java:147) > at > > org.apache.kafka.streams.kstream.internals.KTableKTableJoin$KTableKTableJoinProcessor.process(KTableKTableJoin.java:77) > at > > org.apache.kafka.streams.kstream.internals.KTableKTableJoin$KTableKTableJoinProcessor.process(KTableKTableJoin.java:48) > at > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68) > at > > org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187) > at > > org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:136) > at > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68) > at > > org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187) > at > > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:64) > at > > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:174) > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:320) > at > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) > > Looks like the join is throwing the exception. Any ideas? > > Thanks, > Tim > -- -- Guozhang