Thanks Etienne, The pointers are super helpful. The proposal is very much in the same spirit:
1. Beam TestPipeline ⇄ PipelineTestRunner (own the job lifecycle, wire sources/sinks, await completion). 2. Beam TestStream ⇄ ManualPipelineSource/Sink (inject elements + watermarks deterministically; finish stream). 3. Beam PAssert ⇄ SinkAssertions (eventually/any-order/exact-order checks; we’re also adding side-output assertions). The key difference is that this is driven by Flink’s runtime. On Mon, Oct 13, 2025 at 8:04 PM Gustavo de Morais <[email protected]> wrote: > Hey Poorvank, > > Thanks for the proposal. It's nice that it's compatible with both Table & > DataStream API’s. Could you add a basic example with TableAPI or just > extend the current one showing how it'd look like? > > Kind regards, > Gustavo > > > > On Mon, 13 Oct 2025 at 16:14, Etienne Chauchot <[email protected]> > wrote: > > > Hi Poorvank, > > > > Thanks for this proposition. I agree with the need and the general > > architecture. > > > > In Apache Beam project there was a similar test framework that allowed > > to run a complete pipeline in test with control on watermark (set the > > watermark of the source), processing time (change clock time), wait for > > pipeline finish and then do some assertions on the output stream. > > > > Here are the pointers to the different classes to take some inspiration > > from: > > > > - > > > > > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java > > > > - > > > > > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java > > > > - > > > > > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java > > > > Best > > > > Etienne > > > > Le 07/10/2025 à 05:57, Poorvank Bhatia a écrit : > > > Hi Samrat, > > > > > > Thanks for the review. Please find the responses inline below. > > > > > > 1. For complex stateful applications, verifying only the final output > at > > >> the sink may not be sufficient. Sometimes, there is a need to validate > > the > > >> intermediate state being built up within operators to ensure > > correctness. > > >> Is there a mechanism to inspect and verify the contents of an > operator's > > >> state? > > > > > > IMO For fine-grained, deterministic validation of > > Value/List/Map/Broadcast > > > state and processing-time timers, the existing Operator/Function test > > > harnesses are the right tool > > > > > > A common pattern in many of the Flink jobs is the use of side outputs > to > > >> route different types of data to different downstream processes. The > > >> singular *assertSink() *method appears to target only the main data > > stream. > > >> Can you consider extending the interface to allow tests to capture and > > >> assert data emitted to named side outputs? > > >> This could take the form of an assertion method like > > >> *assertSideOutput(OutputTag<T> > > >> tag, T... expected)*, enabling comprehensive testing of pipelines with > > side > > >> output logic. WDUT? > > > > > > Agreed. Can add first-class support for side outputs by > > > registering OutputTag<T>s and collecting them via a named buffer, e.g.: > > > > > > - > > > > > > Runner: withSideOutput(OutputTag<T> tag[, String name]) > > > - > > > > > > Assertions: assertSideOutput(tag).receivedExactlyInAnyOrder(...) > > > Implementation reuses the same shared sink buffer under a > namespaced > > key > > > for each tag. > > > > > > > > > a. *sendWatermarkAndAwaitIdle()*: A method that sends a watermark > > >> and then blocks until the framework can determine that all in-flight > > >> records and timers triggered by that watermark have been fully > > processed. > > >> This would help users to add tests for windowed operations by removing > > the > > >> need for manual waits or sleeps. > > > > > > Not sure what would be the right way to do that. The runtime doesn’t > > expose > > > a reliable, global “quiescent after W” signal. Watermark firings can > > > cascade, and in-flight work sits in mailboxes, and async callbacks—so > > > "fully processed" is unverifiable end-to-end. > > > > > > > > >> b.* advanceProcessingTime(Duration duration)*: Method to > > advance > > >> the notion of processing time within the test environment. This will > > help > > >> add deterministic testing logic that depends on processing-time timers > > and > > >> windows. > > >> > > >> AFAIK, same as above i think the the Operator/Function harnesses > already > > > support deterministic processing-time control. > > > > > > Thanks, > > > Poorvank Bhatia > > > > > > On Wed, Sep 24, 2025 at 11:59 AM Samrat Deb <[email protected]> > > wrote: > > > > > >> Thank you, Poorvank, for the proposal. This would be a valuable > addition > > >> toward building reliable and well-tested Flink applications. The > > proposal > > >> is already in excellent shape. > > >> > > >> Below are my take and suggestions on the proposal : > > >> > > >> 1. For complex stateful applications, verifying only the final output > at > > >> the sink may not be sufficient. Sometimes, there is a need to validate > > the > > >> intermediate state being built up within operators to ensure > > correctness. > > >> Is there a mechanism to inspect and verify the contents of an > operator's > > >> state? > > >> > > >> 2. A common pattern in many of the Flink jobs is the use of side > > outputs to > > >> route different types of data to different downstream processes. The > > >> singular *assertSink() *method appears to target only the main data > > stream. > > >> Can you consider extending the interface to allow tests to capture and > > >> assert data emitted to named side outputs? > > >> This could take the form of an assertion method like > > >> *assertSideOutput(OutputTag<T> > > >> tag, T... expected)*, enabling comprehensive testing of pipelines with > > side > > >> output logic. WDUT? > > >> > > >> 3. The *sendWatermark()* method is a solid foundation for event-time > > >> testing. However, more complex scenarios involving windows and timers > > would > > >> benefit from finer-grained control. > > >> My suggestion would be to consider adding the following 2 methods : > > >> > > >> a. *sendWatermarkAndAwaitIdle()*: A method that sends a > > watermark > > >> and then blocks until the framework can determine that all in-flight > > >> records and timers triggered by that watermark have been fully > > processed. > > >> This would help users to add tests for windowed operations by removing > > the > > >> need for manual waits or sleeps. > > >> > > >> b.* advanceProcessingTime(Duration duration)*: Method to > > advance > > >> the notion of processing time within the test environment. This will > > help > > >> add deterministic testing logic that depends on processing-time timers > > and > > >> windows. > > >> > > >> > > >> Bests, > > >> Samrat > > >> > > >> On Wed, Sep 17, 2025 at 1:48 AM Őrhidi Mátyás < > [email protected]> > > >> wrote: > > >> > > >>> Thanks Poorvank, > > >>> > > >>> The proposal looks great. > > >>> > > >>> Thanks, > > >>> Matyas > > >>> > > >>> On Thu, Sep 11, 2025 at 3:44 AM Poorvank Bhatia < > > [email protected]> > > >>> wrote: > > >>> > > >>>> Hi all, > > >>>> I’d like to propose a Pipeline Testing Framework (on Source/Sink V2) > > >> as a > > >>>> part of flink's testing framework, built on concepts presented by > the > > >>> talk > > >>>> *“*Testing Production Streaming Applications > > >>>> <https://www.youtube.com/watch?v=lXHleussX9Q>” (Gyula Fóra & Mátyás > > >>>> Orhidi). > > >>>> > > >>>> The aim is to let users write end-to-end pipeline flow tests — > > >> replacing > > >>>> real connectors with lightweight TestSources/TestSinks — so they can > > >>>> deterministically inject records/watermarks and assert outputs under > > >>>> parallelism. > > >>>> > > >>>> Unlike MiniCluster, local env, or operator harnesses, this provides > a > > >>> small > > >>>> Source/Sink V2-based *testkit* (send(), sendWatermark(), finish(), > > >>>> assertSink()), backed by isolated in-JVM buffers for reproducible > > >>> control. > > >>>> The focus is on testing *pipeline logic and wiring*, not connector > > >>>> implementations themselves. > > >>>> > > >>>> For more details refer to this doc > > >>>> < > > >>>> > > >> > > > https://docs.google.com/document/d/1lwA4BE4vHTkIBQ-IyEBjPh2l7y7Z3MGzlbOlDaKDnzE/edit?tab=t.0 > > >>>> . > > >>>> Looking forward to your feedback. > > >>>> Thanks, > > >>>> Poorvank Bhatia > > >>>> > > >
