Hey Poorvank!

+1 from my side. I think this would be a nice addition to Flink. The built in 
testing utilities are more targeted towards single operator tests/completely 
black box testing.

In our experience we have found pipeline testing utils exceedingly useful.

Cheers
Gyula

On 2025/10/14 12:33:21 Poorvank Bhatia wrote:
> Thanks Gustavo,
> 
> Thanks for the review.
> I have modified the doc and added a minimal SQL/Table example as well.
> 
> On Tue, Oct 14, 2025 at 6:03 PM Poorvank Bhatia <[email protected]>
> wrote:
> 
> > Thanks Etienne,
> >
> > The pointers are super helpful. The proposal is very much in the same
> > spirit:
> >
> >    1. Beam TestPipeline ⇄ PipelineTestRunner (own the job lifecycle, wire
> >    sources/sinks, await completion).
> >    2. Beam TestStream ⇄ ManualPipelineSource/Sink (inject elements +
> >    watermarks deterministically; finish stream).
> >    3. Beam PAssert ⇄ SinkAssertions (eventually/any-order/exact-order
> >    checks; we’re also adding side-output assertions).
> >
> > The key difference is that this is driven by Flink’s runtime.
> >
> >
> > On Mon, Oct 13, 2025 at 8:04 PM Gustavo de Morais <[email protected]>
> > wrote:
> >
> >> Hey Poorvank,
> >>
> >> Thanks for the proposal. It's nice that it's compatible with both Table &
> >> DataStream API’s. Could you add a basic example with TableAPI or just
> >> extend the current one showing how it'd look like?
> >>
> >> Kind regards,
> >> Gustavo
> >>
> >>
> >>
> >> On Mon, 13 Oct 2025 at 16:14, Etienne Chauchot <[email protected]>
> >> wrote:
> >>
> >> > Hi Poorvank,
> >> >
> >> > Thanks for this proposition. I agree with the need and the general
> >> > architecture.
> >> >
> >> > In Apache Beam project there was a similar test framework that allowed
> >> > to run a complete pipeline in test with control on watermark (set the
> >> > watermark of the source), processing time (change clock time), wait for
> >> > pipeline finish and then do some assertions on the output stream.
> >> >
> >> > Here are the pointers to the different classes to take some inspiration
> >> > from:
> >> >
> >> > -
> >> >
> >> >
> >> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
> >> >
> >> > -
> >> >
> >> >
> >> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
> >> >
> >> > -
> >> >
> >> >
> >> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
> >> >
> >> > Best
> >> >
> >> > Etienne
> >> >
> >> > Le 07/10/2025 à 05:57, Poorvank Bhatia a écrit :
> >> > > Hi Samrat,
> >> > >
> >> > > Thanks for the review. Please find the responses inline below.
> >> > >
> >> > > 1. For complex stateful applications, verifying only the final output
> >> at
> >> > >> the sink may not be sufficient. Sometimes, there is a need to
> >> validate
> >> > the
> >> > >> intermediate state being built up within operators to ensure
> >> > correctness.
> >> > >> Is there a mechanism to inspect and verify the contents of an
> >> operator's
> >> > >> state?
> >> > >
> >> > >   IMO For fine-grained, deterministic validation of
> >> > Value/List/Map/Broadcast
> >> > > state and processing-time timers, the existing Operator/Function test
> >> > > harnesses are the right tool
> >> > >
> >> > > A common pattern in many of the Flink jobs is the use of side outputs
> >> to
> >> > >> route different types of data to different downstream processes. The
> >> > >> singular *assertSink() *method appears to target only the main data
> >> > stream.
> >> > >> Can you consider extending the interface to allow tests to capture
> >> and
> >> > >> assert data emitted to named side outputs?
> >> > >> This could take the form of an assertion method like
> >> > >> *assertSideOutput(OutputTag<T>
> >> > >> tag, T... expected)*, enabling comprehensive testing of pipelines
> >> with
> >> > side
> >> > >> output logic. WDUT?
> >> > >
> >> > > Agreed. Can add first-class support for side outputs by
> >> > > registering OutputTag<T>s and collecting them via a named buffer,
> >> e.g.:
> >> > >
> >> > >     -
> >> > >
> >> > >     Runner: withSideOutput(OutputTag<T> tag[, String name])
> >> > >     -
> >> > >
> >> > >     Assertions: assertSideOutput(tag).receivedExactlyInAnyOrder(...)
> >> > >     Implementation reuses the same shared sink buffer under a
> >> namespaced
> >> > key
> >> > >     for each tag.
> >> > >
> >> > >
> >> > >    a. *sendWatermarkAndAwaitIdle()*: A method that sends a watermark
> >> > >> and then blocks until the framework can determine that all in-flight
> >> > >> records and timers triggered by that watermark have been fully
> >> > processed.
> >> > >> This would help users to add tests for windowed operations by
> >> removing
> >> > the
> >> > >> need for manual waits or sleeps.
> >> > >
> >> > > Not sure what would be the right way to do that. The runtime doesn’t
> >> > expose
> >> > > a reliable, global “quiescent after W” signal. Watermark firings can
> >> > > cascade, and in-flight work sits in mailboxes, and async callbacks—so
> >> > > "fully processed" is unverifiable end-to-end.
> >> > >
> >> > >
> >> > >>           b.* advanceProcessingTime(Duration duration)*: Method to
> >> > advance
> >> > >> the notion of processing time within the test environment. This will
> >> > help
> >> > >> add deterministic testing logic that depends on processing-time
> >> timers
> >> > and
> >> > >> windows.
> >> > >>
> >> > >> AFAIK, same as above i think the the Operator/Function harnesses
> >> already
> >> > > support deterministic processing-time control.
> >> > >
> >> > > Thanks,
> >> > > Poorvank Bhatia
> >> > >
> >> > > On Wed, Sep 24, 2025 at 11:59 AM Samrat Deb <[email protected]>
> >> > wrote:
> >> > >
> >> > >> Thank you, Poorvank, for the proposal. This would be a valuable
> >> addition
> >> > >> toward building reliable and well-tested Flink applications. The
> >> > proposal
> >> > >> is already in excellent shape.
> >> > >>
> >> > >> Below are my take and suggestions on the proposal :
> >> > >>
> >> > >> 1. For complex stateful applications, verifying only the final
> >> output at
> >> > >> the sink may not be sufficient. Sometimes, there is a need to
> >> validate
> >> > the
> >> > >> intermediate state being built up within operators to ensure
> >> > correctness.
> >> > >> Is there a mechanism to inspect and verify the contents of an
> >> operator's
> >> > >> state?
> >> > >>
> >> > >> 2. A common pattern in many of the Flink jobs is the use of side
> >> > outputs to
> >> > >> route different types of data to different downstream processes. The
> >> > >> singular *assertSink() *method appears to target only the main data
> >> > stream.
> >> > >> Can you consider extending the interface to allow tests to capture
> >> and
> >> > >> assert data emitted to named side outputs?
> >> > >> This could take the form of an assertion method like
> >> > >> *assertSideOutput(OutputTag<T>
> >> > >> tag, T... expected)*, enabling comprehensive testing of pipelines
> >> with
> >> > side
> >> > >> output logic. WDUT?
> >> > >>
> >> > >> 3. The *sendWatermark()* method is a solid foundation for event-time
> >> > >> testing. However, more complex scenarios involving windows and timers
> >> > would
> >> > >> benefit from finer-grained control.
> >> > >> My suggestion would be to consider adding the following 2 methods :
> >> > >>
> >> > >>           a. *sendWatermarkAndAwaitIdle()*: A method that sends a
> >> > watermark
> >> > >> and then blocks until the framework can determine that all in-flight
> >> > >> records and timers triggered by that watermark have been fully
> >> > processed.
> >> > >> This would help users to add tests for windowed operations by
> >> removing
> >> > the
> >> > >> need for manual waits or sleeps.
> >> > >>
> >> > >>           b.* advanceProcessingTime(Duration duration)*: Method to
> >> > advance
> >> > >> the notion of processing time within the test environment. This will
> >> > help
> >> > >> add deterministic testing logic that depends on processing-time
> >> timers
> >> > and
> >> > >> windows.
> >> > >>
> >> > >>
> >> > >> Bests,
> >> > >> Samrat
> >> > >>
> >> > >> On Wed, Sep 17, 2025 at 1:48 AM Őrhidi Mátyás <
> >> [email protected]>
> >> > >> wrote:
> >> > >>
> >> > >>> Thanks Poorvank,
> >> > >>>
> >> > >>> The proposal looks great.
> >> > >>>
> >> > >>> Thanks,
> >> > >>> Matyas
> >> > >>>
> >> > >>> On Thu, Sep 11, 2025 at 3:44 AM Poorvank Bhatia <
> >> > [email protected]>
> >> > >>> wrote:
> >> > >>>
> >> > >>>> Hi all,
> >> > >>>> I’d like to propose a Pipeline Testing Framework (on Source/Sink
> >> V2)
> >> > >> as a
> >> > >>>> part of flink's testing framework, built on concepts presented by
> >> the
> >> > >>> talk
> >> > >>>> *“*Testing Production Streaming Applications
> >> > >>>> <https://www.youtube.com/watch?v=lXHleussX9Q>” (Gyula Fóra &
> >> Mátyás
> >> > >>>> Orhidi).
> >> > >>>>
> >> > >>>> The aim is to let users write end-to-end pipeline flow tests —
> >> > >> replacing
> >> > >>>> real connectors with lightweight TestSources/TestSinks — so they
> >> can
> >> > >>>> deterministically inject records/watermarks and assert outputs
> >> under
> >> > >>>> parallelism.
> >> > >>>>
> >> > >>>> Unlike MiniCluster, local env, or operator harnesses, this
> >> provides a
> >> > >>> small
> >> > >>>> Source/Sink V2-based *testkit* (send(), sendWatermark(), finish(),
> >> > >>>> assertSink()), backed by isolated in-JVM buffers for reproducible
> >> > >>> control.
> >> > >>>> The focus is on testing *pipeline logic and wiring*, not connector
> >> > >>>> implementations themselves.
> >> > >>>>
> >> > >>>> For more details refer to this doc
> >> > >>>> <
> >> > >>>>
> >> > >>
> >> >
> >> https://docs.google.com/document/d/1lwA4BE4vHTkIBQ-IyEBjPh2l7y7Z3MGzlbOlDaKDnzE/edit?tab=t.0
> >> > >>>> .
> >> > >>>> Looking forward to your feedback.
> >> > >>>> Thanks,
> >> > >>>> Poorvank Bhatia
> >> > >>>>
> >> >
> >>
> >
> 

Reply via email to