Re: kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count

2016-12-01 Thread Hamidreza Afzali
I have added an example for KStreamDriver to the GitHub Gist and updated the 
JIRA issue.

https://issues.apache.org/jira/browse/KAFKA-4461

https://gist.github.com/hrafzali/c2f50e7b957030dab13693eec1e49c13 


Hamid



Re: kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count

2016-11-29 Thread Hamidreza Afzali
I have created a JIRA issue:

https://issues.apache.org/jira/browse/KAFKA-4461


Hamid



Re: kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count

2016-11-24 Thread Hamidreza Afzali
Hi Damian,

It processes correctly when using KStreamTestDriver.

Best,
Hamid



Re: kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count

2016-11-24 Thread Hamidreza Afzali
The map() returns non-null keys and values and produces the following stream:

[KSTREAM-MAP-01]: A , 1
[KSTREAM-MAP-01]: A , 2
[KSTREAM-MAP-01]: B , 3

The issue arises when the combination of map() and groupByKey().count() is used 
with ProcessorTopologyTestDriver.

I have tried the topology on a local Kafka and got the expected result:

input: <"A-1", 1>, <"A-2", 2>, <"B-1", 3>
result: <"A":2>, <"B":1>.


Thanks,
Hamid



Re: kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count

2016-11-23 Thread Hamidreza Afzali
Thanks Matthias.

Disabling the cache didn't solve the issue. Here's a sample code:

https://gist.github.com/hrafzali/c2f50e7b957030dab13693eec1e49c13

The topology doesn't produce any result but it works when commenting out 
.map(...) in line 21.

Thanks,
Hamid



kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count

2016-11-22 Thread Hamidreza Afzali
Hi,

When using ProcessorTopologyTestDriver in the latest Kafka 0.10.1, the 
combination of .map(...) and .groupByKey(...).count(...) does not produce any 
result.

The topology looks like this:

builder.stream(Serdes.String, Serdes.Integer, inputTopic)
 .map((k, v) => new KeyValue(fn(k), v))
 .groupByKey(Serdes.String, Serdes.Integer)
 .count(stateStore)

It works if we remove .map(...) or .groupByKey(...).count(...).

Is this a bug?

Thanks in advance,
Hamid



Re: Kafka 0.10.1 ProcessorTopologyTestDriver and WindowedStreamPartitioner issue

2016-10-04 Thread Hamidreza Afzali
Thanks Guozhang. I can confirm the issue is resolved.

Hamid



Re: Kafka 0.10.1 ProcessorTopologyTestDriver and WindowedStreamPartitioner issue

2016-10-03 Thread Hamidreza Afzali
Thanks Guozhang. We use ProcessorTopologyTestDriver for unit tests.

Hamid


> On 28 Sep 2016, at 11:48 AM, Hamidreza Afzali 
>  wrote:
> 
> Hi,
> 
> We are using the latest Kafka 0.10.1 branch. The combination of 
> ProcessorTopologyTestDriver and WindowedStreamPartitioner is resulting in a 
> division by 0 exception because of the empty list of partitions:
> 
> https://github.com/apache/kafka/blob/0.10.1/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java#L158
> https://github.com/apache/kafka/blob/0.10.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java#L47
> 
> Our topology looks similar to this:
> 
>  builder.stream("events")
>.groupByKey(...)
>.aggregate(...,
>  TimeWindows.of(1 * 60 * 1000L)
>)
>.mapValues(_.size: Integer)
>.to(windowedSerde, Serdes.Integer(), "events-over-time")
> 
> If we use our own partitioner in .to() it works.
> 
>  class MyStreamPartitioner[K, V]() extends StreamPartitioner[K, V] {
>override def partition(k: K, v: V, numPartitions: Int): Integer = {
>  // return an integer between 0 and numPartitions-1, or null if the 
> default partitioning logic should be used
>  null
>}
>  }
> 
> Is this a bug?
> 
> Thank you in advance,
> Hamid
> 



Kafka 0.10.1 ProcessorTopologyTestDriver and WindowedStreamPartitioner issue

2016-09-28 Thread Hamidreza Afzali
Hi,

We are using the latest Kafka 0.10.1 branch. The combination of 
ProcessorTopologyTestDriver and WindowedStreamPartitioner is resulting in a 
division by 0 exception because of the empty list of partitions:

https://github.com/apache/kafka/blob/0.10.1/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java#L158
https://github.com/apache/kafka/blob/0.10.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java#L47

Our topology looks similar to this:

  builder.stream("events")
.groupByKey(...)
.aggregate(...,
  TimeWindows.of(1 * 60 * 1000L)
)
.mapValues(_.size: Integer)
.to(windowedSerde, Serdes.Integer(), "events-over-time")

If we use our own partitioner in .to() it works.

  class MyStreamPartitioner[K, V]() extends StreamPartitioner[K, V] {
override def partition(k: K, v: V, numPartitions: Int): Integer = {
  // return an integer between 0 and numPartitions-1, or null if the 
default partitioning logic should be used
  null
}
  }

Is this a bug?

Thank you in advance,
Hamid