Hey, if you need to request topic creation in advance, I would recommend to do manual re-partitioning via through() -- this allows you to control the topic names and should make your setup more robust.
Eg. stream.selectKey().through("my-own-topic-name").groupByKey()... For this case, Streams will not add an internal re-partitioning topic, as the "re-partition required flag" gets cleared via `through()` and thus `groupByKey()` knows that data is already partitioned correctly. -Matthias On 5/16/17 5:40 PM, João Peixoto wrote: > Your explanation makes sense. If I understood correctly it means that one > stream thread can actually generate records that will be aggregated in a > different thread, based on the new partitioning. > > I didn't think of that case, which now makes more sense. > In my particular case the keys just get appended with some extra > information, so I know there is no need for repartitioning. E.g. "mykey" > > selectKey > "mykey:currentHour" (just an example, I'm not doing windowed > operations). > > The processor API is always a possibility, this is not causing any > performance issues whatsoever. Our Kafka Brokers do not allow automatic > creation of topics so I actually need to request the creation of these > internal topics. I know it is not recommended as the naming convention is > not guaranteed to remain the same in future releases, but the security > configuration there is not final as of now. > > Thanks for the info > > On Tue, May 16, 2017 at 5:28 PM Matthias J. Sax <matth...@confluent.io> > wrote: > >> João, >> >> in your example, k.toUpperCase() does break partitioning. Assume you >> have two records <a,5> and <A,10> -- both do have different keys and >> might be contained in different partitions. After you do a selectKey(), >> both do have the same key. In order to compute the aggregation >> correctly, it is required to re-partition the data to make sure that >> both records <A,5> and <A,10> are processed together. Otherwise, the >> aggregation result would be incorrect. >> >> If and only if you know, that all key are in lowercase (or all keys are >> in uppercase), the re-partitioning would not be required. But Streams >> cannot know this and thus conservatively does re-partition to ensure >> correctness. >> >> Note, that the `toUpperCase()` would not make sense if all keys are in >> upper case already. Furthermore, if all keys are in lower case, you can >> compute the aggregation on the lower case keys directly and convert the >> keys of the result into upper case -- this would allow you to avoid the >> re-partitioning topic. >> >> Does this make sense? >> >> In general, you should use `selectKey()`, `map()` etc only if you need >> to set a new key and thus break partitioning. For you don't modify the >> key, you should use `mapValues()` for example. >> >> >> Nevertheless, there are still some cases, for which the actual key must >> be modified before a key-based operation and user wants to suppress >> re-partitioning as she knows that partitioning is preserved (cf. >> https://issues.apache.org/jira/browse/KAFKA-4835). This is currently not >> supported at DSL level. However, you could fall back to Processor API if >> this is really critical. In general, it seems to be a corner case >> optimization though. >> >> >> >> -Matthias >> >> >> On 5/16/17 4:44 PM, João Peixoto wrote: >>> Certain operations require a repartition topic, such as "selectKey" or >>> "map". What purpose serves this repartition topic? >>> >>> Sample record: {"key": "a", ...} >>> >>> Stream: source.selectKey((k, v) -> KeyValue.pair(k.toUpperCase(), >>> v)).groupByKey() //... >>> >>> From my understanding, the repartition topic will guarantee that if we >> are >>> reading from partition N, the new key will be written to the same >> partition >>> N on the repartition topic, which allows the stream task to always handle >>> the same partition number all the way. >>> >>> This seems relevant if the topology above is followed by: >>> /*...*/.toStream().leftJoin(kTable) //... >>> We are still processing the same partition number. If the source stream >> and >>> the kTable are co-partitioned, so will be the repartition topic. >>> >>> However in cases where there are no other operations in the topology like >>> "joins", that repartition topic seems useless. >>> >>> There's a thread on this subject >>> < >> http://mail-archives.apache.org/mod_mbox/kafka-users/201705.mbox/%3CCAJikTEUHR=r0ika6vlf_y+qajxg8f_q19og_-s+q-gozpqb...@mail.gmail.com%3E >>> , >>> specific to topics with one partition only. The argument there is that >>> repartition does not make sense on a topic with 1 partition only. >> However, >>> even if you have multiple partitions but never join with anything else, >> it >>> may not make sense for the reasons above. >>> >> >> >
signature.asc
Description: OpenPGP digital signature