Hey,

if you need to request topic creation in advance, I would recommend to
do manual re-partitioning via through() -- this allows you to control
the topic names and should make your setup more robust.

Eg.

stream.selectKey().through("my-own-topic-name").groupByKey()...

For this case, Streams will not add an internal re-partitioning topic,
as the "re-partition required flag" gets cleared via `through()` and
thus `groupByKey()` knows that data is already partitioned correctly.



-Matthias

On 5/16/17 5:40 PM, João Peixoto wrote:
> Your explanation makes sense. If I understood correctly it means that one
> stream thread can actually generate records that will be aggregated in a
> different thread, based on the new partitioning.
> 
> I didn't think of that case, which now makes more sense.
> In my particular case the keys just get appended with some extra
> information, so I know there is no need for repartitioning. E.g. "mykey" >
> selectKey > "mykey:currentHour" (just an example, I'm not doing windowed
> operations).
> 
> The processor API is always a possibility, this is not causing any
> performance issues whatsoever. Our Kafka Brokers do not allow automatic
> creation of topics so I actually need to request the creation of these
> internal topics. I know it is not recommended as the naming convention is
> not guaranteed to remain the same in future releases, but the security
> configuration there is not final as of now.
> 
> Thanks for the info
> 
> On Tue, May 16, 2017 at 5:28 PM Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> João,
>>
>> in your example, k.toUpperCase() does break partitioning. Assume you
>> have two records <a,5> and <A,10> -- both do have different keys and
>> might be contained in different partitions. After you do a selectKey(),
>> both do have the same key. In order to compute the aggregation
>> correctly, it is required to re-partition the data to make sure that
>> both records <A,5> and <A,10> are processed together. Otherwise, the
>> aggregation result would be incorrect.
>>
>> If and only if you know, that all key are in lowercase (or all keys are
>> in uppercase), the re-partitioning would not be required. But Streams
>> cannot know this and thus conservatively does re-partition to ensure
>> correctness.
>>
>> Note, that the `toUpperCase()` would not make sense if all keys are in
>> upper case already. Furthermore, if all keys are in lower case, you can
>> compute the aggregation on the lower case keys directly and convert the
>> keys of the result into upper case -- this would allow you to avoid the
>> re-partitioning topic.
>>
>> Does this make sense?
>>
>> In general, you should use `selectKey()`, `map()` etc only if you need
>> to set a new key and thus break partitioning. For you don't modify the
>> key, you should use `mapValues()` for example.
>>
>>
>> Nevertheless, there are still some cases, for which the actual key must
>> be modified before a key-based operation and user wants to suppress
>> re-partitioning as she knows that partitioning is preserved (cf.
>> https://issues.apache.org/jira/browse/KAFKA-4835). This is currently not
>> supported at DSL level. However, you could fall back to Processor API if
>> this is really critical. In general, it seems to be a corner case
>> optimization though.
>>
>>
>>
>> -Matthias
>>
>>
>> On 5/16/17 4:44 PM, João Peixoto wrote:
>>> Certain operations require a repartition topic, such as "selectKey" or
>>> "map". What purpose serves this repartition topic?
>>>
>>> Sample record: {"key": "a", ...}
>>>
>>> Stream: source.selectKey((k, v) -> KeyValue.pair(k.toUpperCase(),
>>> v)).groupByKey() //...
>>>
>>> From my understanding, the repartition topic will guarantee that if we
>> are
>>> reading from partition N, the new key will be written to the same
>> partition
>>> N on the repartition topic, which allows the stream task to always handle
>>> the same partition number all the way.
>>>
>>> This seems relevant if the topology above is followed by:
>>> /*...*/.toStream().leftJoin(kTable) //...
>>> We are still processing the same partition number. If the source stream
>> and
>>> the kTable are co-partitioned, so will be the repartition topic.
>>>
>>> However in cases where there are no other operations in the topology like
>>> "joins", that repartition topic seems useless.
>>>
>>> There's a thread on this subject
>>> <
>> http://mail-archives.apache.org/mod_mbox/kafka-users/201705.mbox/%3CCAJikTEUHR=r0ika6vlf_y+qajxg8f_q19og_-s+q-gozpqb...@mail.gmail.com%3E
>>> ,
>>> specific to topics with one partition only. The argument there is that
>>> repartition does not make sense on a topic with 1 partition only.
>> However,
>>> even if you have multiple partitions but never join with anything else,
>> it
>>> may not make sense for the reasons above.
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to