Hi all,

Thank you all for your answers.


AS:

Indeed, the state machine is heavier to support. The builder pattern sounds 
like a good idea to us. It keeps the API clean, makes the multi-partition mode 
explicit, and preserves full backward compatibility.

We would also love to hear what others think before we commit to this direction.


LB1 - State store access in multi-partition mode

Indeed, getStateStore(name) is ambiguous in multi-partition mode. We plan to 
introduce partition-aware overloads for each store:

getStateStore(String, 
partition)<https://kafka.apache.org/42/javadoc/org/apache/kafka/streams/TopologyTestDriver.html#getStateStore(java.lang.String)>

  *   
getKeyValueStore(String<https://kafka.apache.org/42/javadoc/org/apache/kafka/streams/TopologyTestDriver.html#getKeyValueStore(java.lang.String)>,
 
partition<https://kafka.apache.org/42/javadoc/org/apache/kafka/streams/TopologyTestDriver.html#getStateStore(java.lang.String)>)<https://kafka.apache.org/42/javadoc/org/apache/kafka/streams/TopologyTestDriver.html#getKeyValueStore(java.lang.String)>
  *   
getTimestampedKeyValueStore(String<https://kafka.apache.org/42/javadoc/org/apache/kafka/streams/TopologyTestDriver.html#getTimestampedKeyValueStore(java.lang.String)>,
 
partition<https://kafka.apache.org/42/javadoc/org/apache/kafka/streams/TopologyTestDriver.html#getStateStore(java.lang.String)>)<https://kafka.apache.org/42/javadoc/org/apache/kafka/streams/TopologyTestDriver.html#getTimestampedKeyValueStore(java.lang.String)>
  *   
getVersionedKeyValueStore(String<https://kafka.apache.org/42/javadoc/org/apache/kafka/streams/TopologyTestDriver.html#getVersionedKeyValueStore(java.lang.String)>,
 
partition<https://kafka.apache.org/42/javadoc/org/apache/kafka/streams/TopologyTestDriver.html#getStateStore(java.lang.String)>)<https://kafka.apache.org/42/javadoc/org/apache/kafka/streams/TopologyTestDriver.html#getVersionedKeyValueStore(java.lang.String)>
  *   
getWindowStore(String<https://kafka.apache.org/42/javadoc/org/apache/kafka/streams/TopologyTestDriver.html#getWindowStore(java.lang.String)>,
 
partition<https://kafka.apache.org/42/javadoc/org/apache/kafka/streams/TopologyTestDriver.html#getStateStore(java.lang.String)>)<https://kafka.apache.org/42/javadoc/org/apache/kafka/streams/TopologyTestDriver.html#getWindowStore(java.lang.String)>
  *   
getTimestampedWindowStore(String<https://kafka.apache.org/42/javadoc/org/apache/kafka/streams/TopologyTestDriver.html#getTimestampedWindowStore(java.lang.String)>,
 
partition<https://kafka.apache.org/42/javadoc/org/apache/kafka/streams/TopologyTestDriver.html#getStateStore(java.lang.String)>)<https://kafka.apache.org/42/javadoc/org/apache/kafka/streams/TopologyTestDriver.html#getTimestampedWindowStore(java.lang.String)>
  *   
getSessionStore(String<https://kafka.apache.org/42/javadoc/org/apache/kafka/streams/TopologyTestDriver.html#getSessionStore(java.lang.String)>,
 
partition<https://kafka.apache.org/42/javadoc/org/apache/kafka/streams/TopologyTestDriver.html#getStateStore(java.lang.String)>)<https://kafka.apache.org/42/javadoc/org/apache/kafka/streams/TopologyTestDriver.html#getSessionStore(java.lang.String)>


So mutating operations like put() method on the selected store will be fully 
supported.

The no-arg form would continue to work for global stores (not partitioned) and 
for single-partition topologies (only one task owns the store).


LB2 - Internal repartition routing:

Today the driver re-enqueues everything on partition 0.

With the KIP, each record is processed by a task, any records it produces to 
internal or source topics will be routed to the correct downstream partition 
before the next processing step. If the sink node carries an explicit partition 
(e.g. via a custom StreamPartitioner), that value is used directly. Otherwise, 
the driver derives the target partition from the record's new key using the 
same hash function as the real Kafka producer, so the routing is consistent 
with what would happen on a live cluster.

To support this, we plan to introduce a two-level map structure keyed by topic 
and then by partition:

  Map<String, Map<Integer, Queue<ProducerRecord<byte[], byte[]>>>> 
outputByTopicPartition

This allows the driver to efficiently enqueue each produced record into the 
queue of the correct (topic, partition) pair, and then hand it off to the task 
that owns that pair. The processing loop then continues until all task queues 
are quiescent. This means that a key change before a repartition operator will 
naturally cause the downstream record to land on a different task — which is 
precisely the scenario this KIP is designed to make testable.


LB3 - Multi-subtopology task model:

You are correct that "one task per partition" understates the model. We will 
update the KIP to make the actual model explicit:

  TaskId = (subtopologyId, partition)

For N sub-topologies and P partitions: N × P tasks, consistent with production 
Kafka Streams.

For example, with 2 sub-topologies and 2 partitions:

  Task(0,0) — subtopology 0, partition 0
  Task(0,1) — subtopology 0, partition 1
  Task(1,0) — subtopology 1, partition 0
  Task(1,1) — subtopology 1, partition 1

With pipeInput(record, partition=0):
  → Task(0,0) processes the record
  → emits to repartition topic; hash(newKey) % 2 = 1
  → record enqueued on Task(1,1)
  → Task(1,1) processes it


We will update the KIP to reflect all of the above and look forward to your 
feedback

Cheers !

Julien & Adam & Marie-Laure & Sébastien


De : Lucas Brutschy via dev <[email protected]>
Date : lundi, 20 avril 2026 à 21:33
À : [email protected] <[email protected]>
Cc : Alieh Saeedi <[email protected]>; Lucas Brutschy 
<[email protected]>
Objet : [EXT] Re: [DISCUSS] KIP-1238: Multipartition for TopologyTestDriver in 
Kafka Streams

Warning External sender Do not click on any links or open any attachments 
unless you trust the sender and know the content is safe.

Hi all,

Thanks for the KIP and the ongoing discussion. I have a few questions
about the draft:

LB1: How should state store access work in multi-partition mode? Today
getKeyValueStore("myStore") returns the one store owned by the single
task. With multiple partitions, there are multiple store instances,
one per task. Would it make sense to add a getKeyValueStore(name,
partition) overload, and/or have the no-argument form return a
composite read-only view? Related: how should this interact with
mutating operations like put?

LB2: Could the KIP describe how repartition routing works internally?
Today the code re-enqueues records from internal topics back into
partition 0. In multi-partition mode, records written to a repartition
topic would need to be routed through the partitioner on the new key
so they land on the correct downstream task. Since this is the core
scenario the KIP tries to enable, understanding this mechanism would
be helpful.

LB3: How are multi-subtopology topologies handled? The driver today
creates one task per subtopology (currently just one, with TaskId(0,
0)). In multi-partition mode, a topology with N subtopologies and P
partitions would logically need N × P tasks — so the "one task per
partition" framing in the KIP may be understating it. Could the KIP
clarify the task model for multi-subtopology cases, including
processing order across tasks after a single pipeInput?

Thanks,
Lucas

This email was screened for spam and malicious content but exercise caution 
anyway.




On Thu, Apr 16, 2026 at 11:48 AM Alieh Saeedi via dev
<[email protected]> wrote:
>
> Hi all
>
> Thanks for the KIP and the interesting discussion!
>
> About backward compatibility:
> - What Matthias suggested reminded me of the state machine. It's a very
> clever idea but I think it's too complex for the user and also seems a bit
> error-prone. We must address all edge cases.
> - The alternative of keeping two classes around until AK 5.0 feels like a
> heavy maintenance burden for roughly two years, and it would also require
> significant code changes once AK 5.0 is released. The question is how much
> code reuse can we achieve (to avoid duplication)?
> - I’ve tried to follow the reasoning, but I may be missing the point. My
> question is: why don’t we introduce an explicit multi-partition mode with a
> clean separation between setup and execution phases, while still
> maintaining 100% backward compatibility? We can use the builder pattern
> like `TopologyTestDriver driver = new TopologyTestDriver(topology,
> config).withMultiPartitionMode().declareTopic("input", 3)....build();` for
> multi-partition usages. Create the tasks in `build()` and keep the
> `createInputTopic` as-is and both users (single and multi-partition users)
> can still call it. Inside `createInputTopic`, we check if the topic is in
> the list/map of multi-partition topics, we create it with the specified
> number of partitions and if not, we create it single partition.
>
> nit: It would be great if you could update the KIP with the correct
> discussion thread link:
>
> https://lists.apache.org/thread/7nmfsf3fcgdqdp4t2y14y7npfwgrv0yp<https://lists.apache.org/thread/7nmfsf3fcgdqdp4t2y14y7npfwgrv0yp>
>  (maybe?!).
> I usually follow the discussion in the thread rather than via the mailing
> list:)
>
> Bests,
> Alieh
>
> On Mon, Mar 9, 2026 at 3:13 PM Marie-Laure Momplot <
> [email protected]> wrote:
>
> > Big thanks for your feedback, Matthias — much appreciated !
> >
> > MJS1:
> > Regarding backward compatibility, another option could be to introduce a
> > new class, "MultiPartitionTopologyTestDriver".
> > This would allow us to keep the current behavior where input and output
> > topics must be created before sending records.
> > It would also avoid introducing a multi/active mode in the existing
> > "TopologyTestDriver".
> > The new class could coexist alongside TopologyTestDriver starting in Kafka
> > 4.x (for example 4.3 or 4.4).
> > TopologyTestDriver" could then be deprecated in Kafka 5, while
> > MultiPartitionTopologyTestDriver would become the default and only driver
> > in Kafka 6.
> > In this new class, topics would be multi-partitioned by default, which
> > would align with the intended behavior of a test driver for kafka.
> >
> > MJS2:
> >
> > We agree that using ConsumerRecords-like semantics could be an interesting
> > alternative for reading from multi-partition output topics — it’s closer to
> > the Kafka consumer API and may feel more familiar. As you said, there are
> > trade-offs: it could be more verbose for simple test scenarios, and we
> > would need to decide whether to expose partition-level access directly.
> >
> > I do not think the second example here
> >
> > >> // Optional helper: read directly from a specific partition
> > >> TestOutputTopic<String, String> partition1Topic =
> > outputTopic.partition(1);
> > >> List<TestRecord<String, String>> partition1Records =
> > partition1Topic.readRecordsToList();
> >
> > is really necessary when testing and reading results. With the default
> > partitioner, the first example is sufficient and correct.
> >
> > Thank you for your help.
> >
> > Best regards,
> >
> >
> > Adam, Julien, Sébastien and Marie-Laure
> >




De : Matthias J. Sax <[email protected]>
Date : mardi, 3 mars 2026 à 23:50
À : [email protected] <[email protected]>
Objet : [SUSPICIOUS EXTERNAL MESSAGE] Re: [DISCUSS] KIP-1238: Multipartition 
for TopologyTestDriver in Kafka Streams

Warning This might be a fraudulent message! When clicking REPLY, your answers 
will NOT go to the sender ([email protected]). Instead, replies will be sent to 
[email protected]. Be cautious!

Thanks for the follow ups.

MJS1:

>> One concern is backward compatibility with existing tests. We could make 
>> driver.init() mandatory only if at least one topic has more than one 
>> partition, keeping single-partition tests unchanged.

Yes, I had the same thought. We should not break existing testing code.

It will be some implementation detail how to wire it up all correctly,
but it seems doable:

I think the way to do it would be to have an internal TDD flag (which
test topics also have access to) about "single partition mode" vs "multi
partition mode", plus "inactive" vs "active".

We start with single/inactive and allow to create any input topics. As
long as only single partition topic are created, we stay in
single/inactive mode. If a multi-partition topic get crated we go to
multi/inactive mode.

As long as we are in single/inactive, topics can be used at any point.
If a topic is used, we switch to single/active. We now only allow to add
new single partition topis.

After we entered multi/inactive mode, we start to disallow to use input
topic. User can call `init()` to transit to `multi/active`.

If we are in `multi/active` no new topics can be created any longer, but
test topics can now be used.





MJS2: I like the examples overall. For reading from a multi-partition
output topic, I was thinking also along the lines of `ConsumerRecords`
as some alternative approach? -- Not saying its better; there is
tradeoffs between both. But curious to hear what you think about it?

I do like this example:

>> // Reading output
>> List<TestRecord<String, String>> outputRecords = 
>> outputTopic.readRecordsToList();
>>
>> // Filter by partition if needed
>> List<TestRecord<String, String>> partition0Records = outputRecords.stream()
>> .filter(r -> r.getPartition() == 0)
>> .collect(Collectors.toList());


But I am not a fan of

>> // Optional helper: read directly from a specific partition
>> TestOutputTopic<String, String> partition1Topic = outputTopic.partition(1);
>> List<TestRecord<String, String>> partition1Records = 
>> partition1Topic.readRecordsToList();

It overloads `TestOutputTopic` in a weird way. If we would want to go
this route, I could instead introduce a `TestOutputTopicPartition` ?

Or do we think, adding `TestOutputTopicPartition` would be too much?



-Matthias

This email was screened for spam and malicious content but exercise caution 
anyway.




On 2/27/26 7:28 AM, Sebastien Viale wrote:
> Hi,
>
> Thanks for your remarks,
>
> MJS1:
> Indeed, I started looking into a possible update for Tasks and GlobalTasks, 
> and it seems this introduces a lot of complexity.
>
> I initially considered simply forbidding the creation of input/output topics 
> after records have been sent; however, introducing a dedicated setup phase 
> seems cleaner and easier to implement.
>
> One concern is backward compatibility with existing tests. We could make 
> driver.init() mandatory only if at least one topic has more than one 
> partition, keeping single-partition tests unchanged.
>
> MJS2:
> Good catch — your solution reduces the number of pipeInput and readRecord 
> overloads. We just need to add a new partition property to TestRecord.
>
> To summarize, a test could look like this:
>
> // Setup and initialization
> TopologyTestDriver driver = new TopologyTestDriver(...);
> driver.createInputTopic("topic1", 2);
> driver.createOutputTopic("topic2", 2);
>
> // Topics exist but cannot pipe records yet
> driver.init(); // tasks and global tasks are created
>
> // Partition 0 and 1 are valid
> TestRecord<String, String> record0 = new TestRecord<>("key0", "value0", 
> 1000L, 0);
> TestRecord<String, String> record1 = new TestRecord<>("key1", "value1", 
> 1000L, 1);
>
> inputTopic.pipeInput(record0);
> inputTopic.pipeInput(record1);
>
> // Invalid partition → exception
> TestRecord<String, String> record2 = new TestRecord<>("key2", "value2", 
> 1000L, 2);
> inputTopic.pipeInput(record2); // throws IllegalArgumentException
>
> // Reading output
> List<TestRecord<String, String>> outputRecords = 
> outputTopic.readRecordsToList();
>
> // Filter by partition if needed
> List<TestRecord<String, String>> partition0Records = outputRecords.stream()
> .filter(r -> r.getPartition() == 0)
> .collect(Collectors.toList());
>
> // Optional helper: read directly from a specific partition
> TestOutputTopic<String, String> partition1Topic = outputTopic.partition(1);
> List<TestRecord<String, String>> partition1Records = 
> partition1Topic.readRecordsToList();
>
>
> We will for other comments, if any, before updating the KIP.
>
> regards,
> Marie Laure, Adam, Julien and Sébastien
>
>
> De : Matthias J. Sax <[email protected]>
> Date : dimanche, 22 février 2026 à 23:56
> À : [email protected] <[email protected]>
> Objet : [EXT] Re: [DISCUSS] KIP-1238: Multipartition for TopologyTestDriver 
> in Kafka Streams
>
> Warning External sender Do not click on any links or open any attachments 
> unless you trust the sender and know the content is safe.
>
> Hello everyone,
>
> thanks for this KIP. It's indeed a long standing feature request to
> support multi-partition TDD, so I am very happy to see this KIP (sorry
> for ignoring it for way too long...)
>
>
> Couple of question:
>
>
> MJS1: The KIP states:
>
>> Each time a input topic is created with a partition number higher than the 
>> max partition number already defined, the Tasks and GlobalTasks will be 
>> updated.
>
> This sounds quite complex? Atm, because TTD is single-partitioned, one
> can call `createInputTopic()` and `createOutputTopic()` at any time, and
> pipe record through the topology interleaved. Because there is only a
> single partition/task, it does not make things complicated.
>
> However, with multiple partitions and task, I am worried that we might
> introduce unnecessary complexity (potentially in both the TDD impl, as
> well as for the user). Could it make sense to add a dedicated setup
> phase? Ie, all input/output topic must be created before we allow to
> pipe record? The control flow would be something like:
>
>> TopologyTestDriver driver = new TopologyTestDriver(...);
>> driver.createInputTopic(...)
>> driver.createOutputTopic(...)
>>
>> // created TestInputTopics / TestOutputTopics cannot be used yet
>>
>> driver.init(); // sets up the driver and creates tasks
>>
>> // now TestInputTopic / TestOutputTopic can be used
>
>
>
> MJS2: I am somewhat concerned about the number of overloads we are
> adding to `TestInputTopic` and `TestOutputTopic`.
>
> Would it be simpler to instead modify `TestRecord` and allow to set the
> partition there, to avoid all the overloads to `pipeInput`? I would
> assume that most tests would rely on `DefaultPartitioner` anyway, and it
> seems ok to force the usage of `TestRecord` if a partitions must be set
> explicitly?
>
> For `TestOutputTopic` the partition number of each record would be set
> on the `TestRecord` again (for `readRecord() and `readRecordsToList()`.
> We could also introduce `TestOutputTopicPartition` and add
> `TestOutputTopic.partition(int)` to allow reading data from a specific
> partition?
>
>
>
> -Matthias
>
> This email was screened for spam and malicious content but exercise caution 
> anyway.
>
>
>
>
>
> On 11/12/25 11:48 PM, Sebastien Viale wrote:
>> Hi Everyone,
>>
>> I would like to start a discussion on KIP-1238: Multipartition for 
>> TopologyTestDriver in Kafka Streams
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1238%3A+Multipartition+for+TopologyTestDriver+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1238%3A+Multipartition+for+TopologyTestDriver+in+Kafka+Streams><https://cwiki.apache.org/confluence/display/KAFKA/KIP-1238%3A+Multipartition+for+TopologyTestDriver+in+Kafka+Streams<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1238%3A+Multipartition+for+TopologyTestDriver+in+Kafka+Streams>>
>>
>> This KIP proposes to introduce multi-partition support in the 
>> TopologyTestDriver, enabling more accurate and convenient stream testing 
>> while improving automated unit test coverage.
>>
>>
>> Regards,
>>
>>
>> Julien & Adam & Marie-Laure & Sébastien
>>
>>

Reply via email to