Hi all,
Thanks Sébastien for the follow-ups. From my side this is ready for a
vote, although I do have some more comments below. These are all nits
and non-blocking concerns, so I am happy to +1 the KIP even if we
haven't converged on a solution for the items below.
LB1 / LB2 / LB3 follow-up: Just to confirm one detail: in
multi-partition mode, the no-arg getStateStore/getKeyValueStore/etc.
throws, right? That seems like the only sane choice but it would be
worth stating in the KIP. Except for global stores - where I need to
use the non-arg version to access partition 0, assuming you don't have
to declare partitions for global stores; I'm not sure this is explicit
enough in the KIP.
LB4 follow-up: Routing all null-key records to partition 0 keeps
things deterministic, but it diverges from Kafka's DefaultPartitioner,
which spreads null keys across partitions via uniform-sticky. The
practical effect is that null-key tests in multi-partition mode will
only ever exercise one task, which seems like it could mask the very
fan-out the KIP is meant to enable. Wouldn't round-robin be more
realistic?
LB6: Assuming the partition in TestRecord is part of its equals, I can
no longer write assertEquals(new TestRecord<>("k", "v"),
output.readRecord()), because the partitions will not match. I'm not
sure if I have a solution.
Thanks,
Lucas
On Wed, May 27, 2026 at 5:35 PM Sebastien Viale
<[email protected]> wrote:
>
> Hi all,
>
> Thanks for your remarks.
>
> AS:
> On the builder pattern, we agree it's a cleaner approach than both the
> state-machine and the two-class strategies. The separation between setup and
> execution phases would be made explicit at the API level rather than just at
> the implementation level.
> Concretely:
>
> // Multi-partition mode (new)
> TopologyTestDriver driver = new TopologyTestDriver(topology, config)
> .withMultiPartitionMode()
> .declareTopic("input", 3)
> .declareTopic("output", 3)
> .build();
>
> // Single-partition mode (unchanged)
> TopologyTestDriver driver = new TopologyTestDriver(topology, config);
>
> // Same topic declaration
> TestInputTopic<String, String> input = driver.createInputTopic( "input",
> Serdes.String().serializer(),Serdes.String().serializer());
> TestOutputTopic<String, String> output = driver.createOutputTopic(
> "output",Serdes.String().deserializer(), Serdes.String().deserializer());
>
> createInputTopic, createOutputTopic and pipeInput remain identical in both
> modes
> So callers that never invoke withMultiPartitionMode() will observe strictly
> identical behaviour to the current release — no new code path, no risk of
> regression.
>
> LB4:
> Records with `null` key will be redirected to partition 0 if no explicit
> partition is set.
>
> private int resolvePartition(final String topic, final byte[] keyBytes, final
> Integer explicit) {
> final int n = Math.max(1, declaredPartitionsByTopic.getOrDefault(topic,
> 1));
> if (explicit != null) {
> if (explicit < 0 || explicit >= n) {
> throw new IllegalArgumentException(
> "Partition " + explicit + " is out of range for topic '" +
> topic
> + "' (has " + n + " partitions). Declare a higher count
> via declareTopic() if needed.");
> }
> return explicit;
> }
> if (keyBytes == null || n == 1) {
> return 0;
> }
> return Utils.toPositive(Utils.murmur2(keyBytes)) % n;
> }
>
>
> LB5:
> When a single pipeInput causes records to fan-out across multiple downstream
> tasks, the test driver select the task with the lowest current stream time.
> When multiple tasks share the same stream-time (which is the common case
> immediately after a fan-out,), the tasks are stored in a TreeMap<TaskId,
> StreamTask>. As you suggest, the ordering will be deterministic and stable
> (e.g. ascending `(subtopologyId, partition)`
>
> private final TreeMap<TaskId, StreamTask> multiSubTasks = new
> java.util.TreeMap<>();
>
> private StreamTask pickNextProcessableTask() {
> StreamTask task = null;
> long bestTime = Long.MAX_VALUE;
> final long now = mockWallClockTime.milliseconds();
> for (final StreamTask t : multiSubTasks.values()) {
> if (!t.hasRecordsQueued() || !t.isProcessable(now)) {
> continue;
> }
> final long streamTime = (t.processorContext()).currentStreamTimeMs();
> if (streamTime < bestTime) {
> bestTime = streamTime;
> task = t;
> }
> }
> return task;
> }
>
>
> We will update the KIP accordingly to all the comments
>
> Cheers
>
>
> De : Lucas Brutschy via dev <[email protected]>
> Date : jeudi, 21 mai 2026 à 14:25
> À : [email protected] <[email protected]>
> Cc : Alieh Saeedi <[email protected]>; Lucas Brutschy
> <[email protected]>
> Objet : [EXT] Re: [DISCUSS] KIP-1238: Multipartition for TopologyTestDriver
> in Kafka Streams
>
> Warning External sender Do not click on any links or open any attachments
> unless you trust the sender and know the content is safe.
>
> Hi all,
>
> Thanks for the updates. Sorry for the late response, somehow the
> latest response was not delivered to my inbox and I had to fetch it
> form the mailing list archives. A few thoughts on the recent points,
> and a couple of follow-ups.
>
> AS: The builder pattern is a good direction — it avoids forcing the
> user to reason about a mode/state machine, and it lets the existing
> single-partition flow stay untouched. The Compatibility section should
> state explicitly that existing constructors plus
> `createInputTopic/createOutputTopic/pipeInput/getStateStore` remain
> source- and binary-compatible for callers that never touch
> `withMultiPartitionMode()`, so we have a clear contract for existing
> tests.
>
> LB4: How are records with a `null` key routed in multi-partition mode?
> The real producer's `DefaultPartitioner` uses sticky partitioning for
> null keys, which would make tests non-deterministic. The KIP should
> specify the behavior — for example, round-robin in declared partition
> order, or requiring an explicit partition on `TestRecord` when the key
> is null.
>
> LB5: When a single `pipeInput` causes records to fan out across
> multiple downstream tasks, what is the processing order? A documented
> deterministic order (e.g. ascending `(subtopologyId, partition)`)
> would be valuable to state in the KIP.
>
> Thanks,
> Lucas
>
> ========================================================================================
> This email was screened for spam and malicious content but exercise caution
> anyway.
>
>
>
>