Jenny,
thanks for reaching out. Yes, we plan to remove the old variant of
`branch()` (which was deprecated with KIP-418) in the 4.0 release
(https://issues.apache.org/jira/browse/KAFKA-12824) which is planned for
end of this year.
KIP-418 changes the pattern from relying on order to relying on *names*.
The KIP explains the naming strategy in detail:
The keys of the Map entries are defined by the following rules:
- If Named parameter was provided for split , its value is used as a prefix for
each key. By default, no prefix is used
- If a name is provided for the branch, its value is appended to the prefix to
form the Map key
- If a name is not provided for the branch, then the key defaults to prefix + position of
the branch as a decimal number, starting from "1"
- If a name is not provided for the defaultBranch call, then the key defaults to prefix +
"0"
The values of the Map entries are formed as following:
- If no chain function or consumer is provided, then the value is the branch
itself (which is equivalent to ks→ks identity chain function)
- If a chain function is provided and returns a non-null value for a given
branch, then the value is the result returned by this function
- If a chain function returns null for a given branch, then the respective
entry is not put to the map
- If a consumer is provided for a given branch, then the the respective entry
is not put to the map
So me it seems to be rather an issue with Spring than with Kafka
Streams, if the corresponding binder does not use the new API in the
intended way?
In the end, why is the binder still returning an array anyway instead if
a `Map`? And if it does need to return an array, why does it "naively"
create the array via `streamMap.values().toArray(new KStream[0])`
introducing the problem?
It seem, Spring could implement the binder using more sophisticated
logic to fix the problem (or just return the `Map` instead of an array
to begin with)?
HTH.
-Matthias
On 6/20/24 11:02 AM, Jenny Qiao wrote:
Hello,
KIP-418 (>=2.8) improved the branching return type from KStream<>[] to HashMap.
This changes the output binding when using multiple branches in the Kafka Streams binder
of Spring Cloud Stream - the bindings are possibly wrong because the map storing the
KStream branches is unordered.
Before KIP-418, if I have multiple branches, my function could look like this:
public Function<KStream<String, String>, KStream<String, String>[]> process() {
Predicate<String, String> isHappy = (k, v) -> v.contains(":D");
Predicate<String, String> isSad = (k, v) -> v.contains(":(");
Predicate<String, String> isOwo = (k, v) -> v.toLowerCase().contains("owo");
return input.branch(isHappy, isSad, isOwo);
}
There is an output binding for each branch. Since the return type is a
KStream[] where the order of branches matches the supplied order, the
corresponding output bindings are process-out-0 (isHappy), process-out-1
(isSad), process-out-2 (isOwo).
This changes when using the new branching impl:
public Function<KStream<String, String>, KStream<String, String>[]> process() {
return input -> {
final Map<String,KStream<String, String>> streamMap = input
.split()
.branch(isHappy)
.branch(isSad)
.branch(isOwo)
.noDefaultBranch();
return streamMap.values().toArray(new KStream[0]);
}
}
The call to HashMap<>.values() doesn't necessarily return the KStreams in
supplied/insertion order. For example, if the order of KStreams in the return
corresponds to isSad, isOwo, isHappy, the output bindings would become process-out-0
(isSad), process-out-1 (isOwo), process-out-2 (isHappy) - different from expected. If
the map was ordered like a LinkedHashMap, the return order and thus the bindings
would be correct.
Is this intended behaviour with KIP-418? If so, will the deprecated branching
methods be kept indefinitely for multiple output bindings to bind correctly?