Jenny,

thanks for reaching out. Yes, we plan to remove the old variant of `branch()` (which was deprecated with KIP-418) in the 4.0 release (https://issues.apache.org/jira/browse/KAFKA-12824) which is planned for end of this year.

KIP-418 changes the pattern from relying on order to relying on *names*. The KIP explains the naming strategy in detail:

The keys of the Map entries are defined by the following rules:

- If Named parameter was provided for split , its value is used as a prefix for 
each key. By default, no prefix is used
- If a name is provided for the branch, its value is appended to the prefix to 
form the Map key
- If a name is not provided for the branch, then the key defaults to prefix + position of 
the branch as a decimal number, starting from "1"
- If a name is not provided for the defaultBranch call, then the key defaults to prefix + 
"0"

The values of the Map entries are formed as following:

- If no chain function or consumer is provided, then the value is the branch 
itself (which is equivalent to ks→ks identity chain function)
- If a chain function is provided and returns a non-null value for a given 
branch, then the value is the result returned by this function
- If a chain function returns null for a given branch, then the respective 
entry is not put to the map
- If a consumer is provided for a given branch, then the the respective entry 
is not put to the map



So me it seems to be rather an issue with Spring than with Kafka Streams, if the corresponding binder does not use the new API in the intended way?

In the end, why is the binder still returning an array anyway instead if a `Map`? And if it does need to return an array, why does it "naively" create the array via `streamMap.values().toArray(new KStream[0])` introducing the problem?

It seem, Spring could implement the binder using more sophisticated logic to fix the problem (or just return the `Map` instead of an array to begin with)?


HTH.



-Matthias


On 6/20/24 11:02 AM, Jenny Qiao wrote:
Hello,


KIP-418 (>=2.8) improved the branching return type from KStream<>[] to HashMap. 
This changes the output binding when using multiple branches in the Kafka Streams binder 
of Spring Cloud Stream - the bindings are possibly wrong because the map storing the 
KStream branches is unordered.



Before KIP-418, if I have multiple branches, my function could look like this:



public Function<KStream<String, String>, KStream<String, String>[]> process() {

Predicate<String, String> isHappy = (k, v) -> v.contains(":D");

Predicate<String, String> isSad = (k, v) -> v.contains(":(");

Predicate<String, String> isOwo = (k, v) -> v.toLowerCase().contains("owo");

return input.branch(isHappy, isSad, isOwo);

}



There is an output binding for each branch. Since the return type is a 
KStream[] where the order of branches matches the supplied order, the 
corresponding output bindings are process-out-0 (isHappy), process-out-1 
(isSad), process-out-2 (isOwo).



This changes when using the new branching impl:



public Function<KStream<String, String>, KStream<String, String>[]> process() {

return input -> {

final Map<String,KStream<String, String>> streamMap = input

.split()

.branch(isHappy)

.branch(isSad)

.branch(isOwo)

.noDefaultBranch();

return streamMap.values().toArray(new KStream[0]);

}

}



The call to HashMap<>.values() doesn't necessarily return the KStreams in 
supplied/insertion order. For example, if the order of KStreams in the return 
corresponds to isSad, isOwo, isHappy, the output bindings would become process-out-0 
(isSad), process-out-1 (isOwo), process-out-2 (isHappy) - different from expected. If 
the map was ordered like a LinkedHashMap, the return order and thus the bindings 
would be correct.



Is this intended behaviour with KIP-418? If so, will the deprecated branching 
methods be kept indefinitely for multiple output bindings to bind correctly?


Reply via email to