Thanks Gustavo, Thanks for the review. I have modified the doc and added a minimal SQL/Table example as well.
On Tue, Oct 14, 2025 at 6:03 PM Poorvank Bhatia <[email protected]> wrote: > Thanks Etienne, > > The pointers are super helpful. The proposal is very much in the same > spirit: > > 1. Beam TestPipeline ⇄ PipelineTestRunner (own the job lifecycle, wire > sources/sinks, await completion). > 2. Beam TestStream ⇄ ManualPipelineSource/Sink (inject elements + > watermarks deterministically; finish stream). > 3. Beam PAssert ⇄ SinkAssertions (eventually/any-order/exact-order > checks; we’re also adding side-output assertions). > > The key difference is that this is driven by Flink’s runtime. > > > On Mon, Oct 13, 2025 at 8:04 PM Gustavo de Morais <[email protected]> > wrote: > >> Hey Poorvank, >> >> Thanks for the proposal. It's nice that it's compatible with both Table & >> DataStream API’s. Could you add a basic example with TableAPI or just >> extend the current one showing how it'd look like? >> >> Kind regards, >> Gustavo >> >> >> >> On Mon, 13 Oct 2025 at 16:14, Etienne Chauchot <[email protected]> >> wrote: >> >> > Hi Poorvank, >> > >> > Thanks for this proposition. I agree with the need and the general >> > architecture. >> > >> > In Apache Beam project there was a similar test framework that allowed >> > to run a complete pipeline in test with control on watermark (set the >> > watermark of the source), processing time (change clock time), wait for >> > pipeline finish and then do some assertions on the output stream. >> > >> > Here are the pointers to the different classes to take some inspiration >> > from: >> > >> > - >> > >> > >> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java >> > >> > - >> > >> > >> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java >> > >> > - >> > >> > >> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java >> > >> > Best >> > >> > Etienne >> > >> > Le 07/10/2025 à 05:57, Poorvank Bhatia a écrit : >> > > Hi Samrat, >> > > >> > > Thanks for the review. Please find the responses inline below. >> > > >> > > 1. For complex stateful applications, verifying only the final output >> at >> > >> the sink may not be sufficient. Sometimes, there is a need to >> validate >> > the >> > >> intermediate state being built up within operators to ensure >> > correctness. >> > >> Is there a mechanism to inspect and verify the contents of an >> operator's >> > >> state? >> > > >> > > IMO For fine-grained, deterministic validation of >> > Value/List/Map/Broadcast >> > > state and processing-time timers, the existing Operator/Function test >> > > harnesses are the right tool >> > > >> > > A common pattern in many of the Flink jobs is the use of side outputs >> to >> > >> route different types of data to different downstream processes. The >> > >> singular *assertSink() *method appears to target only the main data >> > stream. >> > >> Can you consider extending the interface to allow tests to capture >> and >> > >> assert data emitted to named side outputs? >> > >> This could take the form of an assertion method like >> > >> *assertSideOutput(OutputTag<T> >> > >> tag, T... expected)*, enabling comprehensive testing of pipelines >> with >> > side >> > >> output logic. WDUT? >> > > >> > > Agreed. Can add first-class support for side outputs by >> > > registering OutputTag<T>s and collecting them via a named buffer, >> e.g.: >> > > >> > > - >> > > >> > > Runner: withSideOutput(OutputTag<T> tag[, String name]) >> > > - >> > > >> > > Assertions: assertSideOutput(tag).receivedExactlyInAnyOrder(...) >> > > Implementation reuses the same shared sink buffer under a >> namespaced >> > key >> > > for each tag. >> > > >> > > >> > > a. *sendWatermarkAndAwaitIdle()*: A method that sends a watermark >> > >> and then blocks until the framework can determine that all in-flight >> > >> records and timers triggered by that watermark have been fully >> > processed. >> > >> This would help users to add tests for windowed operations by >> removing >> > the >> > >> need for manual waits or sleeps. >> > > >> > > Not sure what would be the right way to do that. The runtime doesn’t >> > expose >> > > a reliable, global “quiescent after W” signal. Watermark firings can >> > > cascade, and in-flight work sits in mailboxes, and async callbacks—so >> > > "fully processed" is unverifiable end-to-end. >> > > >> > > >> > >> b.* advanceProcessingTime(Duration duration)*: Method to >> > advance >> > >> the notion of processing time within the test environment. This will >> > help >> > >> add deterministic testing logic that depends on processing-time >> timers >> > and >> > >> windows. >> > >> >> > >> AFAIK, same as above i think the the Operator/Function harnesses >> already >> > > support deterministic processing-time control. >> > > >> > > Thanks, >> > > Poorvank Bhatia >> > > >> > > On Wed, Sep 24, 2025 at 11:59 AM Samrat Deb <[email protected]> >> > wrote: >> > > >> > >> Thank you, Poorvank, for the proposal. This would be a valuable >> addition >> > >> toward building reliable and well-tested Flink applications. The >> > proposal >> > >> is already in excellent shape. >> > >> >> > >> Below are my take and suggestions on the proposal : >> > >> >> > >> 1. For complex stateful applications, verifying only the final >> output at >> > >> the sink may not be sufficient. Sometimes, there is a need to >> validate >> > the >> > >> intermediate state being built up within operators to ensure >> > correctness. >> > >> Is there a mechanism to inspect and verify the contents of an >> operator's >> > >> state? >> > >> >> > >> 2. A common pattern in many of the Flink jobs is the use of side >> > outputs to >> > >> route different types of data to different downstream processes. The >> > >> singular *assertSink() *method appears to target only the main data >> > stream. >> > >> Can you consider extending the interface to allow tests to capture >> and >> > >> assert data emitted to named side outputs? >> > >> This could take the form of an assertion method like >> > >> *assertSideOutput(OutputTag<T> >> > >> tag, T... expected)*, enabling comprehensive testing of pipelines >> with >> > side >> > >> output logic. WDUT? >> > >> >> > >> 3. The *sendWatermark()* method is a solid foundation for event-time >> > >> testing. However, more complex scenarios involving windows and timers >> > would >> > >> benefit from finer-grained control. >> > >> My suggestion would be to consider adding the following 2 methods : >> > >> >> > >> a. *sendWatermarkAndAwaitIdle()*: A method that sends a >> > watermark >> > >> and then blocks until the framework can determine that all in-flight >> > >> records and timers triggered by that watermark have been fully >> > processed. >> > >> This would help users to add tests for windowed operations by >> removing >> > the >> > >> need for manual waits or sleeps. >> > >> >> > >> b.* advanceProcessingTime(Duration duration)*: Method to >> > advance >> > >> the notion of processing time within the test environment. This will >> > help >> > >> add deterministic testing logic that depends on processing-time >> timers >> > and >> > >> windows. >> > >> >> > >> >> > >> Bests, >> > >> Samrat >> > >> >> > >> On Wed, Sep 17, 2025 at 1:48 AM Őrhidi Mátyás < >> [email protected]> >> > >> wrote: >> > >> >> > >>> Thanks Poorvank, >> > >>> >> > >>> The proposal looks great. >> > >>> >> > >>> Thanks, >> > >>> Matyas >> > >>> >> > >>> On Thu, Sep 11, 2025 at 3:44 AM Poorvank Bhatia < >> > [email protected]> >> > >>> wrote: >> > >>> >> > >>>> Hi all, >> > >>>> I’d like to propose a Pipeline Testing Framework (on Source/Sink >> V2) >> > >> as a >> > >>>> part of flink's testing framework, built on concepts presented by >> the >> > >>> talk >> > >>>> *“*Testing Production Streaming Applications >> > >>>> <https://www.youtube.com/watch?v=lXHleussX9Q>” (Gyula Fóra & >> Mátyás >> > >>>> Orhidi). >> > >>>> >> > >>>> The aim is to let users write end-to-end pipeline flow tests — >> > >> replacing >> > >>>> real connectors with lightweight TestSources/TestSinks — so they >> can >> > >>>> deterministically inject records/watermarks and assert outputs >> under >> > >>>> parallelism. >> > >>>> >> > >>>> Unlike MiniCluster, local env, or operator harnesses, this >> provides a >> > >>> small >> > >>>> Source/Sink V2-based *testkit* (send(), sendWatermark(), finish(), >> > >>>> assertSink()), backed by isolated in-JVM buffers for reproducible >> > >>> control. >> > >>>> The focus is on testing *pipeline logic and wiring*, not connector >> > >>>> implementations themselves. >> > >>>> >> > >>>> For more details refer to this doc >> > >>>> < >> > >>>> >> > >> >> > >> https://docs.google.com/document/d/1lwA4BE4vHTkIBQ-IyEBjPh2l7y7Z3MGzlbOlDaKDnzE/edit?tab=t.0 >> > >>>> . >> > >>>> Looking forward to your feedback. >> > >>>> Thanks, >> > >>>> Poorvank Bhatia >> > >>>> >> > >> >
