chickenchickenlove opened a new pull request, #20968:
URL: https://github.com/apache/kafka/pull/20968
### Description
This PR refactors the `DSL`'s optimization logic (reusing the input topic as
the changelog) to leverage the public `Topology#addReadOnlyStateStore` API
introduced in KIP-813, replacing the previous usage of internal builder methods.
### Key Changes:
- Refactored `TableSourceNode`
- Replaced the manual wiring of sources and stores (via `addSource`,
`addProcessor`, `connectSourceStoreAndTopic`) with a call to
`topologyBuilder.addReadOnlyStateStore`.
- Updated `InternalTopologyBuilder`:
- Added an overloaded `addReadOnlyStateStore` method accepting a
`ProcessorSupplier` with wildcard output types `(?, ?)`.
- Reasoning: The public PAPI `addReadOnlyStateStore` enforces `Void, Void`
output types. However, the `DSL`'s `KTableSource` needs to forward records
downstream (emitting K, Change<V>). The internal overload allows the `DSL` to
reuse the "read-only" wiring logic while maintaining its forwarding behavior.
- Added Regression Tests:
- Added
`TopologyTest#whenKTableSourceIsOptimizedThenTopologyShouldBeSerialPipeline` -
Verifies that the generated topology description remains identical to the
pre-refactoring structure, ensuring strict backward compatibility.
- Added
`TopologyTest#whenKTableSourceIsOptimizedThenItsStateStoreShouldNotLog` -
Confirms that the optimization is correctly applied (changelog logging is
disabled).
### Result
- Fixes https://issues.apache.org/jira/browse/KAFKA-16366
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]