Michal Borowiecki created KAFKA-4835:
----------------------------------------
Summary: Allow users control over repartitioning
Key: KAFKA-4835
URL: https://issues.apache.org/jira/browse/KAFKA-4835
Project: Kafka
Issue Type: Improvement
Components: streams
Affects Versions: 0.10.2.0
Reporter: Michal Borowiecki
>From
>https://issues.apache.org/jira/browse/KAFKA-4601?focusedCommentId=15881030&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15881030
...it would be good to provide users more control over the repartitioning.
My use case is as follows (unrelated bits omitted for brevity):
{code}
KTable<String, Activity> loggedInCustomers = builder
.stream("customerLogins")
.groupBy((key, activity) ->
activity.getCustomerRef())
.reduce((first,second) -> second, loginStore());
builder
.stream("balanceUpdates")
.map((key, activity) -> new KeyValue<>(
activity.getCustomerRef(),
activity))
.join(loggedInCustomers, (activity, session) -> ...
.to("sessions");
{code}
Both "groupBy" and "map" in the underlying implementation set the
repartitionRequired flag (since the key changes), and the aggregation/join that
follows will create the repartitioned topic.
However, in our case I know that both input streams are already partitioned by
the customerRef value, which I'm mapping into the key (because it's required by
the join operation).
So there are 2 unnecessary intermediate topics created with their associated
overhead, while the ultimate goal is simply to do a join on a value that we
already use to partition the original streams anyway.
(Note, we don't have the option to re-implement the original input streams to
make customerRef the message key.)
I think it would be better to allow the user to decide (from their knowledge of
the incoming streams) whether a repartition is mandatory on aggregation and
join operations (overloaded version of the methods with the repartitionRequired
flag exposed maybe?)
An alternative would be to allow users to perform a join on a value other than
the key (a keyValueMapper parameter to join, like the one used for joins with
global tables), but I expect that to be more involved and error-prone to use
for people who don't understand the partitioning requirements well (whereas
it's safe for global tables).
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)