Hi Samrat, Thanks for the review. Please find the responses inline below.
1. For complex stateful applications, verifying only the final output at > the sink may not be sufficient. Sometimes, there is a need to validate the > intermediate state being built up within operators to ensure correctness. > Is there a mechanism to inspect and verify the contents of an operator's > state? IMO For fine-grained, deterministic validation of Value/List/Map/Broadcast state and processing-time timers, the existing Operator/Function test harnesses are the right tool A common pattern in many of the Flink jobs is the use of side outputs to > route different types of data to different downstream processes. The > singular *assertSink() *method appears to target only the main data stream. > Can you consider extending the interface to allow tests to capture and > assert data emitted to named side outputs? > This could take the form of an assertion method like > *assertSideOutput(OutputTag<T> > tag, T... expected)*, enabling comprehensive testing of pipelines with side > output logic. WDUT? Agreed. Can add first-class support for side outputs by registering OutputTag<T>s and collecting them via a named buffer, e.g.: - Runner: withSideOutput(OutputTag<T> tag[, String name]) - Assertions: assertSideOutput(tag).receivedExactlyInAnyOrder(...) Implementation reuses the same shared sink buffer under a namespaced key for each tag. a. *sendWatermarkAndAwaitIdle()*: A method that sends a watermark > and then blocks until the framework can determine that all in-flight > records and timers triggered by that watermark have been fully processed. > This would help users to add tests for windowed operations by removing the > need for manual waits or sleeps. Not sure what would be the right way to do that. The runtime doesn’t expose a reliable, global “quiescent after W” signal. Watermark firings can cascade, and in-flight work sits in mailboxes, and async callbacks—so "fully processed" is unverifiable end-to-end. > b.* advanceProcessingTime(Duration duration)*: Method to advance > the notion of processing time within the test environment. This will help > add deterministic testing logic that depends on processing-time timers and > windows. > > AFAIK, same as above i think the the Operator/Function harnesses already support deterministic processing-time control. Thanks, Poorvank Bhatia On Wed, Sep 24, 2025 at 11:59 AM Samrat Deb <[email protected]> wrote: > Thank you, Poorvank, for the proposal. This would be a valuable addition > toward building reliable and well-tested Flink applications. The proposal > is already in excellent shape. > > Below are my take and suggestions on the proposal : > > 1. For complex stateful applications, verifying only the final output at > the sink may not be sufficient. Sometimes, there is a need to validate the > intermediate state being built up within operators to ensure correctness. > Is there a mechanism to inspect and verify the contents of an operator's > state? > > 2. A common pattern in many of the Flink jobs is the use of side outputs to > route different types of data to different downstream processes. The > singular *assertSink() *method appears to target only the main data stream. > Can you consider extending the interface to allow tests to capture and > assert data emitted to named side outputs? > This could take the form of an assertion method like > *assertSideOutput(OutputTag<T> > tag, T... expected)*, enabling comprehensive testing of pipelines with side > output logic. WDUT? > > 3. The *sendWatermark()* method is a solid foundation for event-time > testing. However, more complex scenarios involving windows and timers would > benefit from finer-grained control. > My suggestion would be to consider adding the following 2 methods : > > a. *sendWatermarkAndAwaitIdle()*: A method that sends a watermark > and then blocks until the framework can determine that all in-flight > records and timers triggered by that watermark have been fully processed. > This would help users to add tests for windowed operations by removing the > need for manual waits or sleeps. > > b.* advanceProcessingTime(Duration duration)*: Method to advance > the notion of processing time within the test environment. This will help > add deterministic testing logic that depends on processing-time timers and > windows. > > > Bests, > Samrat > > On Wed, Sep 17, 2025 at 1:48 AM Őrhidi Mátyás <[email protected]> > wrote: > > > Thanks Poorvank, > > > > The proposal looks great. > > > > Thanks, > > Matyas > > > > On Thu, Sep 11, 2025 at 3:44 AM Poorvank Bhatia <[email protected]> > > wrote: > > > > > Hi all, > > > I’d like to propose a Pipeline Testing Framework (on Source/Sink V2) > as a > > > part of flink's testing framework, built on concepts presented by the > > talk > > > *“*Testing Production Streaming Applications > > > <https://www.youtube.com/watch?v=lXHleussX9Q>” (Gyula Fóra & Mátyás > > > Orhidi). > > > > > > The aim is to let users write end-to-end pipeline flow tests — > replacing > > > real connectors with lightweight TestSources/TestSinks — so they can > > > deterministically inject records/watermarks and assert outputs under > > > parallelism. > > > > > > Unlike MiniCluster, local env, or operator harnesses, this provides a > > small > > > Source/Sink V2-based *testkit* (send(), sendWatermark(), finish(), > > > assertSink()), backed by isolated in-JVM buffers for reproducible > > control. > > > The focus is on testing *pipeline logic and wiring*, not connector > > > implementations themselves. > > > > > > For more details refer to this doc > > > < > > > > > > https://docs.google.com/document/d/1lwA4BE4vHTkIBQ-IyEBjPh2l7y7Z3MGzlbOlDaKDnzE/edit?tab=t.0 > > > > > > > . > > > Looking forward to your feedback. > > > Thanks, > > > Poorvank Bhatia > > > > > >
