Re: kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count
I have added an example for KStreamDriver to the GitHub Gist and updated the JIRA issue. https://issues.apache.org/jira/browse/KAFKA-4461 https://gist.github.com/hrafzali/c2f50e7b957030dab13693eec1e49c13 Hamid
Re: kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count
I have created a JIRA issue: https://issues.apache.org/jira/browse/KAFKA-4461 Hamid
Re: kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count
Hi Damian, It processes correctly when using KStreamTestDriver. Best, Hamid
Re: kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count
The map() returns non-null keys and values and produces the following stream: [KSTREAM-MAP-01]: A , 1 [KSTREAM-MAP-01]: A , 2 [KSTREAM-MAP-01]: B , 3 The issue arises when the combination of map() and groupByKey().count() is used with ProcessorTopologyTestDriver. I have tried the topology on a local Kafka and got the expected result: input: <"A-1", 1>, <"A-2", 2>, <"B-1", 3> result: <"A":2>, <"B":1>. Thanks, Hamid
Re: kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count
Thanks Matthias. Disabling the cache didn't solve the issue. Here's a sample code: https://gist.github.com/hrafzali/c2f50e7b957030dab13693eec1e49c13 The topology doesn't produce any result but it works when commenting out .map(...) in line 21. Thanks, Hamid
kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count
Hi, When using ProcessorTopologyTestDriver in the latest Kafka 0.10.1, the combination of .map(...) and .groupByKey(...).count(...) does not produce any result. The topology looks like this: builder.stream(Serdes.String, Serdes.Integer, inputTopic) .map((k, v) => new KeyValue(fn(k), v)) .groupByKey(Serdes.String, Serdes.Integer) .count(stateStore) It works if we remove .map(...) or .groupByKey(...).count(...). Is this a bug? Thanks in advance, Hamid
Re: Kafka 0.10.1 ProcessorTopologyTestDriver and WindowedStreamPartitioner issue
Thanks Guozhang. I can confirm the issue is resolved. Hamid
Re: Kafka 0.10.1 ProcessorTopologyTestDriver and WindowedStreamPartitioner issue
Thanks Guozhang. We use ProcessorTopologyTestDriver for unit tests. Hamid > On 28 Sep 2016, at 11:48 AM, Hamidreza Afzali > wrote: > > Hi, > > We are using the latest Kafka 0.10.1 branch. The combination of > ProcessorTopologyTestDriver and WindowedStreamPartitioner is resulting in a > division by 0 exception because of the empty list of partitions: > > https://github.com/apache/kafka/blob/0.10.1/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java#L158 > https://github.com/apache/kafka/blob/0.10.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java#L47 > > Our topology looks similar to this: > > builder.stream("events") >.groupByKey(...) >.aggregate(..., > TimeWindows.of(1 * 60 * 1000L) >) >.mapValues(_.size: Integer) >.to(windowedSerde, Serdes.Integer(), "events-over-time") > > If we use our own partitioner in .to() it works. > > class MyStreamPartitioner[K, V]() extends StreamPartitioner[K, V] { >override def partition(k: K, v: V, numPartitions: Int): Integer = { > // return an integer between 0 and numPartitions-1, or null if the > default partitioning logic should be used > null >} > } > > Is this a bug? > > Thank you in advance, > Hamid >
Kafka 0.10.1 ProcessorTopologyTestDriver and WindowedStreamPartitioner issue
Hi, We are using the latest Kafka 0.10.1 branch. The combination of ProcessorTopologyTestDriver and WindowedStreamPartitioner is resulting in a division by 0 exception because of the empty list of partitions: https://github.com/apache/kafka/blob/0.10.1/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java#L158 https://github.com/apache/kafka/blob/0.10.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java#L47 Our topology looks similar to this: builder.stream("events") .groupByKey(...) .aggregate(..., TimeWindows.of(1 * 60 * 1000L) ) .mapValues(_.size: Integer) .to(windowedSerde, Serdes.Integer(), "events-over-time") If we use our own partitioner in .to() it works. class MyStreamPartitioner[K, V]() extends StreamPartitioner[K, V] { override def partition(k: K, v: V, numPartitions: Int): Integer = { // return an integer between 0 and numPartitions-1, or null if the default partitioning logic should be used null } } Is this a bug? Thank you in advance, Hamid