João,

in your example, k.toUpperCase() does break partitioning. Assume you
have two records <a,5> and <A,10> -- both do have different keys and
might be contained in different partitions. After you do a selectKey(),
both do have the same key. In order to compute the aggregation
correctly, it is required to re-partition the data to make sure that
both records <A,5> and <A,10> are processed together. Otherwise, the
aggregation result would be incorrect.

If and only if you know, that all key are in lowercase (or all keys are
in uppercase), the re-partitioning would not be required. But Streams
cannot know this and thus conservatively does re-partition to ensure
correctness.

Note, that the `toUpperCase()` would not make sense if all keys are in
upper case already. Furthermore, if all keys are in lower case, you can
compute the aggregation on the lower case keys directly and convert the
keys of the result into upper case -- this would allow you to avoid the
re-partitioning topic.

Does this make sense?

In general, you should use `selectKey()`, `map()` etc only if you need
to set a new key and thus break partitioning. For you don't modify the
key, you should use `mapValues()` for example.


Nevertheless, there are still some cases, for which the actual key must
be modified before a key-based operation and user wants to suppress
re-partitioning as she knows that partitioning is preserved (cf.
https://issues.apache.org/jira/browse/KAFKA-4835). This is currently not
supported at DSL level. However, you could fall back to Processor API if
this is really critical. In general, it seems to be a corner case
optimization though.



-Matthias


On 5/16/17 4:44 PM, João Peixoto wrote:
> Certain operations require a repartition topic, such as "selectKey" or
> "map". What purpose serves this repartition topic?
> 
> Sample record: {"key": "a", ...}
> 
> Stream: source.selectKey((k, v) -> KeyValue.pair(k.toUpperCase(),
> v)).groupByKey() //...
> 
> From my understanding, the repartition topic will guarantee that if we are
> reading from partition N, the new key will be written to the same partition
> N on the repartition topic, which allows the stream task to always handle
> the same partition number all the way.
> 
> This seems relevant if the topology above is followed by:
> /*...*/.toStream().leftJoin(kTable) //...
> We are still processing the same partition number. If the source stream and
> the kTable are co-partitioned, so will be the repartition topic.
> 
> However in cases where there are no other operations in the topology like
> "joins", that repartition topic seems useless.
> 
> There's a thread on this subject
> <http://mail-archives.apache.org/mod_mbox/kafka-users/201705.mbox/%3CCAJikTEUHR=r0ika6vlf_y+qajxg8f_q19og_-s+q-gozpqb...@mail.gmail.com%3E>,
> specific to topics with one partition only. The argument there is that
> repartition does not make sense on a topic with 1 partition only. However,
> even if you have multiple partitions but never join with anything else, it
> may not make sense for the reasons above.
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to