[ https://issues.apache.org/jira/browse/KAFKA-4835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Boyang Chen resolved KAFKA-4835. -------------------------------- Resolution: Fixed > Allow users control over repartitioning > --------------------------------------- > > Key: KAFKA-4835 > URL: https://issues.apache.org/jira/browse/KAFKA-4835 > Project: Kafka > Issue Type: Sub-task > Components: streams > Affects Versions: 0.10.2.0 > Reporter: Michal Borowiecki > Priority: Major > Labels: needs-kip > > From > https://issues.apache.org/jira/browse/KAFKA-4601?focusedCommentId=15881030&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15881030 > ...it would be good to provide users more control over the repartitioning. > My use case is as follows (unrelated bits omitted for brevity): > {code} > KTable<String, Activity> loggedInCustomers = builder > .stream("customerLogins") > .groupBy((key, activity) -> > activity.getCustomerRef()) > .reduce((first,second) -> second, loginStore()); > > builder > .stream("balanceUpdates") > .map((key, activity) -> new KeyValue<>( > activity.getCustomerRef(), > activity)) > .join(loggedInCustomers, (activity, session) -> ... > .to("sessions"); > {code} > Both "groupBy" and "map" in the underlying implementation set the > repartitionRequired flag (since the key changes), and the aggregation/join > that follows will create the repartitioned topic. > However, in our case I know that both input streams are already partitioned > by the customerRef value, which I'm mapping into the key (because it's > required by the join operation). > So there are 2 unnecessary intermediate topics created with their associated > overhead, while the ultimate goal is simply to do a join on a value that we > already use to partition the original streams anyway. > (Note, we don't have the option to re-implement the original input streams to > make customerRef the message key.) > I think it would be better to allow the user to decide (from their knowledge > of the incoming streams) whether a repartition is mandatory on aggregation > and join operations (overloaded version of the methods with the > repartitionRequired flag exposed maybe?) > An alternative would be to allow users to perform a join on a value other > than the key (a keyValueMapper parameter to join, like the one used for joins > with global tables), but I expect that to be more involved and error-prone to > use for people who don't understand the partitioning requirements well > (whereas it's safe for global tables). -- This message was sent by Atlassian JIRA (v7.6.3#76005)