Hey Poorvank! +1 from my side. I think this would be a nice addition to Flink. The built in testing utilities are more targeted towards single operator tests/completely black box testing.
In our experience we have found pipeline testing utils exceedingly useful. Cheers Gyula On 2025/10/14 12:33:21 Poorvank Bhatia wrote: > Thanks Gustavo, > > Thanks for the review. > I have modified the doc and added a minimal SQL/Table example as well. > > On Tue, Oct 14, 2025 at 6:03 PM Poorvank Bhatia <[email protected]> > wrote: > > > Thanks Etienne, > > > > The pointers are super helpful. The proposal is very much in the same > > spirit: > > > > 1. Beam TestPipeline ⇄ PipelineTestRunner (own the job lifecycle, wire > > sources/sinks, await completion). > > 2. Beam TestStream ⇄ ManualPipelineSource/Sink (inject elements + > > watermarks deterministically; finish stream). > > 3. Beam PAssert ⇄ SinkAssertions (eventually/any-order/exact-order > > checks; we’re also adding side-output assertions). > > > > The key difference is that this is driven by Flink’s runtime. > > > > > > On Mon, Oct 13, 2025 at 8:04 PM Gustavo de Morais <[email protected]> > > wrote: > > > >> Hey Poorvank, > >> > >> Thanks for the proposal. It's nice that it's compatible with both Table & > >> DataStream API’s. Could you add a basic example with TableAPI or just > >> extend the current one showing how it'd look like? > >> > >> Kind regards, > >> Gustavo > >> > >> > >> > >> On Mon, 13 Oct 2025 at 16:14, Etienne Chauchot <[email protected]> > >> wrote: > >> > >> > Hi Poorvank, > >> > > >> > Thanks for this proposition. I agree with the need and the general > >> > architecture. > >> > > >> > In Apache Beam project there was a similar test framework that allowed > >> > to run a complete pipeline in test with control on watermark (set the > >> > watermark of the source), processing time (change clock time), wait for > >> > pipeline finish and then do some assertions on the output stream. > >> > > >> > Here are the pointers to the different classes to take some inspiration > >> > from: > >> > > >> > - > >> > > >> > > >> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java > >> > > >> > - > >> > > >> > > >> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java > >> > > >> > - > >> > > >> > > >> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java > >> > > >> > Best > >> > > >> > Etienne > >> > > >> > Le 07/10/2025 à 05:57, Poorvank Bhatia a écrit : > >> > > Hi Samrat, > >> > > > >> > > Thanks for the review. Please find the responses inline below. > >> > > > >> > > 1. For complex stateful applications, verifying only the final output > >> at > >> > >> the sink may not be sufficient. Sometimes, there is a need to > >> validate > >> > the > >> > >> intermediate state being built up within operators to ensure > >> > correctness. > >> > >> Is there a mechanism to inspect and verify the contents of an > >> operator's > >> > >> state? > >> > > > >> > > IMO For fine-grained, deterministic validation of > >> > Value/List/Map/Broadcast > >> > > state and processing-time timers, the existing Operator/Function test > >> > > harnesses are the right tool > >> > > > >> > > A common pattern in many of the Flink jobs is the use of side outputs > >> to > >> > >> route different types of data to different downstream processes. The > >> > >> singular *assertSink() *method appears to target only the main data > >> > stream. > >> > >> Can you consider extending the interface to allow tests to capture > >> and > >> > >> assert data emitted to named side outputs? > >> > >> This could take the form of an assertion method like > >> > >> *assertSideOutput(OutputTag<T> > >> > >> tag, T... expected)*, enabling comprehensive testing of pipelines > >> with > >> > side > >> > >> output logic. WDUT? > >> > > > >> > > Agreed. Can add first-class support for side outputs by > >> > > registering OutputTag<T>s and collecting them via a named buffer, > >> e.g.: > >> > > > >> > > - > >> > > > >> > > Runner: withSideOutput(OutputTag<T> tag[, String name]) > >> > > - > >> > > > >> > > Assertions: assertSideOutput(tag).receivedExactlyInAnyOrder(...) > >> > > Implementation reuses the same shared sink buffer under a > >> namespaced > >> > key > >> > > for each tag. > >> > > > >> > > > >> > > a. *sendWatermarkAndAwaitIdle()*: A method that sends a watermark > >> > >> and then blocks until the framework can determine that all in-flight > >> > >> records and timers triggered by that watermark have been fully > >> > processed. > >> > >> This would help users to add tests for windowed operations by > >> removing > >> > the > >> > >> need for manual waits or sleeps. > >> > > > >> > > Not sure what would be the right way to do that. The runtime doesn’t > >> > expose > >> > > a reliable, global “quiescent after W” signal. Watermark firings can > >> > > cascade, and in-flight work sits in mailboxes, and async callbacks—so > >> > > "fully processed" is unverifiable end-to-end. > >> > > > >> > > > >> > >> b.* advanceProcessingTime(Duration duration)*: Method to > >> > advance > >> > >> the notion of processing time within the test environment. This will > >> > help > >> > >> add deterministic testing logic that depends on processing-time > >> timers > >> > and > >> > >> windows. > >> > >> > >> > >> AFAIK, same as above i think the the Operator/Function harnesses > >> already > >> > > support deterministic processing-time control. > >> > > > >> > > Thanks, > >> > > Poorvank Bhatia > >> > > > >> > > On Wed, Sep 24, 2025 at 11:59 AM Samrat Deb <[email protected]> > >> > wrote: > >> > > > >> > >> Thank you, Poorvank, for the proposal. This would be a valuable > >> addition > >> > >> toward building reliable and well-tested Flink applications. The > >> > proposal > >> > >> is already in excellent shape. > >> > >> > >> > >> Below are my take and suggestions on the proposal : > >> > >> > >> > >> 1. For complex stateful applications, verifying only the final > >> output at > >> > >> the sink may not be sufficient. Sometimes, there is a need to > >> validate > >> > the > >> > >> intermediate state being built up within operators to ensure > >> > correctness. > >> > >> Is there a mechanism to inspect and verify the contents of an > >> operator's > >> > >> state? > >> > >> > >> > >> 2. A common pattern in many of the Flink jobs is the use of side > >> > outputs to > >> > >> route different types of data to different downstream processes. The > >> > >> singular *assertSink() *method appears to target only the main data > >> > stream. > >> > >> Can you consider extending the interface to allow tests to capture > >> and > >> > >> assert data emitted to named side outputs? > >> > >> This could take the form of an assertion method like > >> > >> *assertSideOutput(OutputTag<T> > >> > >> tag, T... expected)*, enabling comprehensive testing of pipelines > >> with > >> > side > >> > >> output logic. WDUT? > >> > >> > >> > >> 3. The *sendWatermark()* method is a solid foundation for event-time > >> > >> testing. However, more complex scenarios involving windows and timers > >> > would > >> > >> benefit from finer-grained control. > >> > >> My suggestion would be to consider adding the following 2 methods : > >> > >> > >> > >> a. *sendWatermarkAndAwaitIdle()*: A method that sends a > >> > watermark > >> > >> and then blocks until the framework can determine that all in-flight > >> > >> records and timers triggered by that watermark have been fully > >> > processed. > >> > >> This would help users to add tests for windowed operations by > >> removing > >> > the > >> > >> need for manual waits or sleeps. > >> > >> > >> > >> b.* advanceProcessingTime(Duration duration)*: Method to > >> > advance > >> > >> the notion of processing time within the test environment. This will > >> > help > >> > >> add deterministic testing logic that depends on processing-time > >> timers > >> > and > >> > >> windows. > >> > >> > >> > >> > >> > >> Bests, > >> > >> Samrat > >> > >> > >> > >> On Wed, Sep 17, 2025 at 1:48 AM Őrhidi Mátyás < > >> [email protected]> > >> > >> wrote: > >> > >> > >> > >>> Thanks Poorvank, > >> > >>> > >> > >>> The proposal looks great. > >> > >>> > >> > >>> Thanks, > >> > >>> Matyas > >> > >>> > >> > >>> On Thu, Sep 11, 2025 at 3:44 AM Poorvank Bhatia < > >> > [email protected]> > >> > >>> wrote: > >> > >>> > >> > >>>> Hi all, > >> > >>>> I’d like to propose a Pipeline Testing Framework (on Source/Sink > >> V2) > >> > >> as a > >> > >>>> part of flink's testing framework, built on concepts presented by > >> the > >> > >>> talk > >> > >>>> *“*Testing Production Streaming Applications > >> > >>>> <https://www.youtube.com/watch?v=lXHleussX9Q>” (Gyula Fóra & > >> Mátyás > >> > >>>> Orhidi). > >> > >>>> > >> > >>>> The aim is to let users write end-to-end pipeline flow tests — > >> > >> replacing > >> > >>>> real connectors with lightweight TestSources/TestSinks — so they > >> can > >> > >>>> deterministically inject records/watermarks and assert outputs > >> under > >> > >>>> parallelism. > >> > >>>> > >> > >>>> Unlike MiniCluster, local env, or operator harnesses, this > >> provides a > >> > >>> small > >> > >>>> Source/Sink V2-based *testkit* (send(), sendWatermark(), finish(), > >> > >>>> assertSink()), backed by isolated in-JVM buffers for reproducible > >> > >>> control. > >> > >>>> The focus is on testing *pipeline logic and wiring*, not connector > >> > >>>> implementations themselves. > >> > >>>> > >> > >>>> For more details refer to this doc > >> > >>>> < > >> > >>>> > >> > >> > >> > > >> https://docs.google.com/document/d/1lwA4BE4vHTkIBQ-IyEBjPh2l7y7Z3MGzlbOlDaKDnzE/edit?tab=t.0 > >> > >>>> . > >> > >>>> Looking forward to your feedback. > >> > >>>> Thanks, > >> > >>>> Poorvank Bhatia > >> > >>>> > >> > > >> > > >
