João, in your example, k.toUpperCase() does break partitioning. Assume you have two records <a,5> and <A,10> -- both do have different keys and might be contained in different partitions. After you do a selectKey(), both do have the same key. In order to compute the aggregation correctly, it is required to re-partition the data to make sure that both records <A,5> and <A,10> are processed together. Otherwise, the aggregation result would be incorrect.
If and only if you know, that all key are in lowercase (or all keys are in uppercase), the re-partitioning would not be required. But Streams cannot know this and thus conservatively does re-partition to ensure correctness. Note, that the `toUpperCase()` would not make sense if all keys are in upper case already. Furthermore, if all keys are in lower case, you can compute the aggregation on the lower case keys directly and convert the keys of the result into upper case -- this would allow you to avoid the re-partitioning topic. Does this make sense? In general, you should use `selectKey()`, `map()` etc only if you need to set a new key and thus break partitioning. For you don't modify the key, you should use `mapValues()` for example. Nevertheless, there are still some cases, for which the actual key must be modified before a key-based operation and user wants to suppress re-partitioning as she knows that partitioning is preserved (cf. https://issues.apache.org/jira/browse/KAFKA-4835). This is currently not supported at DSL level. However, you could fall back to Processor API if this is really critical. In general, it seems to be a corner case optimization though. -Matthias On 5/16/17 4:44 PM, João Peixoto wrote: > Certain operations require a repartition topic, such as "selectKey" or > "map". What purpose serves this repartition topic? > > Sample record: {"key": "a", ...} > > Stream: source.selectKey((k, v) -> KeyValue.pair(k.toUpperCase(), > v)).groupByKey() //... > > From my understanding, the repartition topic will guarantee that if we are > reading from partition N, the new key will be written to the same partition > N on the repartition topic, which allows the stream task to always handle > the same partition number all the way. > > This seems relevant if the topology above is followed by: > /*...*/.toStream().leftJoin(kTable) //... > We are still processing the same partition number. If the source stream and > the kTable are co-partitioned, so will be the repartition topic. > > However in cases where there are no other operations in the topology like > "joins", that repartition topic seems useless. > > There's a thread on this subject > <http://mail-archives.apache.org/mod_mbox/kafka-users/201705.mbox/%3CCAJikTEUHR=r0ika6vlf_y+qajxg8f_q19og_-s+q-gozpqb...@mail.gmail.com%3E>, > specific to topics with one partition only. The argument there is that > repartition does not make sense on a topic with 1 partition only. However, > even if you have multiple partitions but never join with anything else, it > may not make sense for the reasons above. >
signature.asc
Description: OpenPGP digital signature