Hi all,

LB1 / LB2 / LB3 follow-up:
Yes you are right, in multi-partition mode, calling on of these
getStateStore(), getKeyValueStore(), getSessionStore(), and
getWindowStore() without specifying the partition should throw an exception
(IllegalStateException)
I will update the KIP.

LB4:
I think we can indeed implement a round robin mode for null keys to more
catch up with the default partitionner implementation
I will update the KIP.

LB5:
We can make TestRecord.equals() and hashCode() do not include the partition
field.
Partition equality must be asserted explicitly via getPartition() when
needed.
This preserves full backward compatibility: existing assertEquals
assertions on TestRecord remain valid in multi-partition mode.
What do you think ?

cheers




Le ven. 29 mai 2026 à 15:16, Lucas Brutschy via dev <[email protected]>
a écrit :

> Hi all,
>
> Thanks Sébastien for the follow-ups. From my side this is ready for a
> vote, although I do have some more comments below. These are all nits
> and non-blocking concerns, so I am happy to +1 the KIP even if we
> haven't converged on a solution for the items below.
>
> LB1 / LB2 / LB3 follow-up: Just to confirm one detail: in
> multi-partition mode, the no-arg getStateStore/getKeyValueStore/etc.
> throws, right? That seems like the only sane choice but it would be
> worth stating in the KIP. Except for global stores - where I need to
> use the non-arg version to access partition 0, assuming you don't have
> to declare partitions for global stores; I'm not sure this is explicit
> enough in the KIP.
>
> LB4 follow-up: Routing all null-key records to partition 0 keeps
> things deterministic, but it diverges from Kafka's DefaultPartitioner,
> which spreads null keys across partitions via uniform-sticky. The
> practical effect is that null-key tests in multi-partition mode will
> only ever exercise one task, which seems like it could mask the very
> fan-out the KIP is meant to enable. Wouldn't round-robin be more
> realistic?
>
> LB6: Assuming the partition in TestRecord is part of its equals, I can
> no longer write assertEquals(new TestRecord<>("k", "v"),
> output.readRecord()), because the partitions will not match. I'm not
> sure if I have a solution.
>
> Thanks,
> Lucas
>
> On Wed, May 27, 2026 at 5:35 PM Sebastien Viale
> <[email protected]> wrote:
> >
> > Hi all,
> >
> > Thanks for your remarks.
> >
> > AS:
> > On the builder pattern, we agree it's a cleaner approach than both the
> state-machine and the two-class strategies. The separation between setup
> and execution phases would be made explicit at the API level rather than
> just at the implementation level.
> > Concretely:
> >
> > // Multi-partition mode (new)
> > TopologyTestDriver driver = new TopologyTestDriver(topology, config)
> > .withMultiPartitionMode()
> > .declareTopic("input", 3)
> > .declareTopic("output", 3)
> > .build();
> >
> > // Single-partition mode (unchanged)
> > TopologyTestDriver driver = new TopologyTestDriver(topology, config);
> >
> > // Same topic declaration
> > TestInputTopic<String, String> input = driver.createInputTopic( "input",
> Serdes.String().serializer(),Serdes.String().serializer());
> > TestOutputTopic<String, String> output = driver.createOutputTopic(
> "output",Serdes.String().deserializer(), Serdes.String().deserializer());
> >
> > createInputTopic, createOutputTopic and pipeInput remain identical in
> both modes
> > So callers that never invoke withMultiPartitionMode() will observe
> strictly identical behaviour to the current release — no new code path, no
> risk of regression.
> >
> > LB4:
> > Records with `null`  key will be redirected to partition 0 if no
> explicit partition is set.
> >
> > private int resolvePartition(final String topic, final byte[] keyBytes,
> final Integer explicit) {
> >    final int n = Math.max(1,
> declaredPartitionsByTopic.getOrDefault(topic, 1));
> >    if (explicit != null) {
> >        if (explicit < 0 || explicit >= n) {
> >            throw new IllegalArgumentException(
> >                "Partition " + explicit + " is out of range for topic '"
> + topic
> >                    + "' (has " + n + " partitions). Declare a higher
> count via declareTopic() if needed.");
> >        }
> >        return explicit;
> >    }
> >    if (keyBytes == null || n == 1) {
> >        return 0;
> >    }
> >    return Utils.toPositive(Utils.murmur2(keyBytes)) % n;
> > }
> >
> >
> > LB5:
> > When a single pipeInput causes records to fan-out across multiple
> downstream tasks, the test driver select the task with the lowest current
> stream time.
> > When multiple tasks share the same stream-time (which is the common case
> immediately after a fan-out,),  the tasks are stored in a TreeMap<TaskId,
> StreamTask>. As you suggest, the ordering will be deterministic and stable
> (e.g. ascending `(subtopologyId, partition)`
> >
> > private final TreeMap<TaskId, StreamTask> multiSubTasks = new
> java.util.TreeMap<>();
> >
> > private StreamTask pickNextProcessableTask() {
> >   StreamTask task = null;
> >   long bestTime = Long.MAX_VALUE;
> >   final long now = mockWallClockTime.milliseconds();
> >   for (final StreamTask t : multiSubTasks.values()) {
> >     if (!t.hasRecordsQueued() || !t.isProcessable(now)) {
> >       continue;
> >     }
> >     final long streamTime = (t.processorContext()).currentStreamTimeMs();
> >     if (streamTime < bestTime) {
> >       bestTime = streamTime;
> >       task = t;
> >     }
> >   }
> >   return task;
> > }
> >
> >
> > We will update the KIP accordingly to all the comments
> >
> > Cheers
> >
> >
> > De : Lucas Brutschy via dev <[email protected]>
> > Date : jeudi, 21 mai 2026 à 14:25
> > À : [email protected] <[email protected]>
> > Cc : Alieh Saeedi <[email protected]>; Lucas Brutschy <
> [email protected]>
> > Objet : [EXT] Re: [DISCUSS] KIP-1238: Multipartition for
> TopologyTestDriver in Kafka Streams
> >
> > Warning External sender Do not click on any links or open any
> attachments unless you trust the sender and know the content is safe.
> >
> > Hi all,
> >
> > Thanks for the updates. Sorry for the late response, somehow the
> > latest response was not delivered to my inbox and I had to fetch it
> > form the mailing list archives. A few thoughts on the recent points,
> > and a couple of follow-ups.
> >
> > AS: The builder pattern is a good direction — it avoids forcing the
> > user to reason about a mode/state machine, and it lets the existing
> > single-partition flow stay untouched. The Compatibility section should
> > state explicitly that existing constructors plus
> > `createInputTopic/createOutputTopic/pipeInput/getStateStore` remain
> > source- and binary-compatible for callers that never touch
> > `withMultiPartitionMode()`, so we have a clear contract for existing
> > tests.
> >
> > LB4: How are records with a `null` key routed in multi-partition mode?
> > The real producer's `DefaultPartitioner` uses sticky partitioning for
> > null keys, which would make tests non-deterministic. The KIP should
> > specify the behavior — for example, round-robin in declared partition
> > order, or requiring an explicit partition on `TestRecord` when the key
> > is null.
> >
> > LB5: When a single `pipeInput` causes records to fan out across
> > multiple downstream tasks, what is the processing order? A documented
> > deterministic order (e.g. ascending `(subtopologyId, partition)`)
> > would be valuable to state in the KIP.
> >
> > Thanks,
> > Lucas
> >
> >
> ========================================================================================
> > This email was screened for spam and malicious content but exercise
> caution anyway.
> >
> >
> >
> >
>

Reply via email to