That's a great tip! Thank you
On Wed, May 17, 2017 at 9:38 AM Matthias J. Sax <matth...@confluent.io>
wrote:

> Hey,
>
> if you need to request topic creation in advance, I would recommend to
> do manual re-partitioning via through() -- this allows you to control
> the topic names and should make your setup more robust.
>
> Eg.
>
> stream.selectKey().through("my-own-topic-name").groupByKey()...
>
> For this case, Streams will not add an internal re-partitioning topic,
> as the "re-partition required flag" gets cleared via `through()` and
> thus `groupByKey()` knows that data is already partitioned correctly.
>
>
>
> -Matthias
>
> On 5/16/17 5:40 PM, João Peixoto wrote:
> > Your explanation makes sense. If I understood correctly it means that one
> > stream thread can actually generate records that will be aggregated in a
> > different thread, based on the new partitioning.
> >
> > I didn't think of that case, which now makes more sense.
> > In my particular case the keys just get appended with some extra
> > information, so I know there is no need for repartitioning. E.g. "mykey"
> >
> > selectKey > "mykey:currentHour" (just an example, I'm not doing windowed
> > operations).
> >
> > The processor API is always a possibility, this is not causing any
> > performance issues whatsoever. Our Kafka Brokers do not allow automatic
> > creation of topics so I actually need to request the creation of these
> > internal topics. I know it is not recommended as the naming convention is
> > not guaranteed to remain the same in future releases, but the security
> > configuration there is not final as of now.
> >
> > Thanks for the info
> >
> > On Tue, May 16, 2017 at 5:28 PM Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> >> João,
> >>
> >> in your example, k.toUpperCase() does break partitioning. Assume you
> >> have two records <a,5> and <A,10> -- both do have different keys and
> >> might be contained in different partitions. After you do a selectKey(),
> >> both do have the same key. In order to compute the aggregation
> >> correctly, it is required to re-partition the data to make sure that
> >> both records <A,5> and <A,10> are processed together. Otherwise, the
> >> aggregation result would be incorrect.
> >>
> >> If and only if you know, that all key are in lowercase (or all keys are
> >> in uppercase), the re-partitioning would not be required. But Streams
> >> cannot know this and thus conservatively does re-partition to ensure
> >> correctness.
> >>
> >> Note, that the `toUpperCase()` would not make sense if all keys are in
> >> upper case already. Furthermore, if all keys are in lower case, you can
> >> compute the aggregation on the lower case keys directly and convert the
> >> keys of the result into upper case -- this would allow you to avoid the
> >> re-partitioning topic.
> >>
> >> Does this make sense?
> >>
> >> In general, you should use `selectKey()`, `map()` etc only if you need
> >> to set a new key and thus break partitioning. For you don't modify the
> >> key, you should use `mapValues()` for example.
> >>
> >>
> >> Nevertheless, there are still some cases, for which the actual key must
> >> be modified before a key-based operation and user wants to suppress
> >> re-partitioning as she knows that partitioning is preserved (cf.
> >> https://issues.apache.org/jira/browse/KAFKA-4835). This is currently
> not
> >> supported at DSL level. However, you could fall back to Processor API if
> >> this is really critical. In general, it seems to be a corner case
> >> optimization though.
> >>
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 5/16/17 4:44 PM, João Peixoto wrote:
> >>> Certain operations require a repartition topic, such as "selectKey" or
> >>> "map". What purpose serves this repartition topic?
> >>>
> >>> Sample record: {"key": "a", ...}
> >>>
> >>> Stream: source.selectKey((k, v) -> KeyValue.pair(k.toUpperCase(),
> >>> v)).groupByKey() //...
> >>>
> >>> From my understanding, the repartition topic will guarantee that if we
> >> are
> >>> reading from partition N, the new key will be written to the same
> >> partition
> >>> N on the repartition topic, which allows the stream task to always
> handle
> >>> the same partition number all the way.
> >>>
> >>> This seems relevant if the topology above is followed by:
> >>> /*...*/.toStream().leftJoin(kTable) //...
> >>> We are still processing the same partition number. If the source stream
> >> and
> >>> the kTable are co-partitioned, so will be the repartition topic.
> >>>
> >>> However in cases where there are no other operations in the topology
> like
> >>> "joins", that repartition topic seems useless.
> >>>
> >>> There's a thread on this subject
> >>> <
> >>
> http://mail-archives.apache.org/mod_mbox/kafka-users/201705.mbox/%3CCAJikTEUHR=r0ika6vlf_y+qajxg8f_q19og_-s+q-gozpqb...@mail.gmail.com%3E
> >>> ,
> >>> specific to topics with one partition only. The argument there is that
> >>> repartition does not make sense on a topic with 1 partition only.
> >> However,
> >>> even if you have multiple partitions but never join with anything else,
> >> it
> >>> may not make sense for the reasons above.
> >>>
> >>
> >>
> >
>
>

Reply via email to