Thanks Etienne,

The pointers are super helpful. The proposal is very much in the same
spirit:

   1. Beam TestPipeline ⇄ PipelineTestRunner (own the job lifecycle, wire
   sources/sinks, await completion).
   2. Beam TestStream ⇄ ManualPipelineSource/Sink (inject elements +
   watermarks deterministically; finish stream).
   3. Beam PAssert ⇄ SinkAssertions (eventually/any-order/exact-order
   checks; we’re also adding side-output assertions).

The key difference is that this is driven by Flink’s runtime.


On Mon, Oct 13, 2025 at 8:04 PM Gustavo de Morais <[email protected]>
wrote:

> Hey Poorvank,
>
> Thanks for the proposal. It's nice that it's compatible with both Table &
> DataStream API’s. Could you add a basic example with TableAPI or just
> extend the current one showing how it'd look like?
>
> Kind regards,
> Gustavo
>
>
>
> On Mon, 13 Oct 2025 at 16:14, Etienne Chauchot <[email protected]>
> wrote:
>
> > Hi Poorvank,
> >
> > Thanks for this proposition. I agree with the need and the general
> > architecture.
> >
> > In Apache Beam project there was a similar test framework that allowed
> > to run a complete pipeline in test with control on watermark (set the
> > watermark of the source), processing time (change clock time), wait for
> > pipeline finish and then do some assertions on the output stream.
> >
> > Here are the pointers to the different classes to take some inspiration
> > from:
> >
> > -
> >
> >
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
> >
> > -
> >
> >
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
> >
> > -
> >
> >
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
> >
> > Best
> >
> > Etienne
> >
> > Le 07/10/2025 à 05:57, Poorvank Bhatia a écrit :
> > > Hi Samrat,
> > >
> > > Thanks for the review. Please find the responses inline below.
> > >
> > > 1. For complex stateful applications, verifying only the final output
> at
> > >> the sink may not be sufficient. Sometimes, there is a need to validate
> > the
> > >> intermediate state being built up within operators to ensure
> > correctness.
> > >> Is there a mechanism to inspect and verify the contents of an
> operator's
> > >> state?
> > >
> > >   IMO For fine-grained, deterministic validation of
> > Value/List/Map/Broadcast
> > > state and processing-time timers, the existing Operator/Function test
> > > harnesses are the right tool
> > >
> > > A common pattern in many of the Flink jobs is the use of side outputs
> to
> > >> route different types of data to different downstream processes. The
> > >> singular *assertSink() *method appears to target only the main data
> > stream.
> > >> Can you consider extending the interface to allow tests to capture and
> > >> assert data emitted to named side outputs?
> > >> This could take the form of an assertion method like
> > >> *assertSideOutput(OutputTag<T>
> > >> tag, T... expected)*, enabling comprehensive testing of pipelines with
> > side
> > >> output logic. WDUT?
> > >
> > > Agreed. Can add first-class support for side outputs by
> > > registering OutputTag<T>s and collecting them via a named buffer, e.g.:
> > >
> > >     -
> > >
> > >     Runner: withSideOutput(OutputTag<T> tag[, String name])
> > >     -
> > >
> > >     Assertions: assertSideOutput(tag).receivedExactlyInAnyOrder(...)
> > >     Implementation reuses the same shared sink buffer under a
> namespaced
> > key
> > >     for each tag.
> > >
> > >
> > >    a. *sendWatermarkAndAwaitIdle()*: A method that sends a watermark
> > >> and then blocks until the framework can determine that all in-flight
> > >> records and timers triggered by that watermark have been fully
> > processed.
> > >> This would help users to add tests for windowed operations by removing
> > the
> > >> need for manual waits or sleeps.
> > >
> > > Not sure what would be the right way to do that. The runtime doesn’t
> > expose
> > > a reliable, global “quiescent after W” signal. Watermark firings can
> > > cascade, and in-flight work sits in mailboxes, and async callbacks—so
> > > "fully processed" is unverifiable end-to-end.
> > >
> > >
> > >>           b.* advanceProcessingTime(Duration duration)*: Method to
> > advance
> > >> the notion of processing time within the test environment. This will
> > help
> > >> add deterministic testing logic that depends on processing-time timers
> > and
> > >> windows.
> > >>
> > >> AFAIK, same as above i think the the Operator/Function harnesses
> already
> > > support deterministic processing-time control.
> > >
> > > Thanks,
> > > Poorvank Bhatia
> > >
> > > On Wed, Sep 24, 2025 at 11:59 AM Samrat Deb <[email protected]>
> > wrote:
> > >
> > >> Thank you, Poorvank, for the proposal. This would be a valuable
> addition
> > >> toward building reliable and well-tested Flink applications. The
> > proposal
> > >> is already in excellent shape.
> > >>
> > >> Below are my take and suggestions on the proposal :
> > >>
> > >> 1. For complex stateful applications, verifying only the final output
> at
> > >> the sink may not be sufficient. Sometimes, there is a need to validate
> > the
> > >> intermediate state being built up within operators to ensure
> > correctness.
> > >> Is there a mechanism to inspect and verify the contents of an
> operator's
> > >> state?
> > >>
> > >> 2. A common pattern in many of the Flink jobs is the use of side
> > outputs to
> > >> route different types of data to different downstream processes. The
> > >> singular *assertSink() *method appears to target only the main data
> > stream.
> > >> Can you consider extending the interface to allow tests to capture and
> > >> assert data emitted to named side outputs?
> > >> This could take the form of an assertion method like
> > >> *assertSideOutput(OutputTag<T>
> > >> tag, T... expected)*, enabling comprehensive testing of pipelines with
> > side
> > >> output logic. WDUT?
> > >>
> > >> 3. The *sendWatermark()* method is a solid foundation for event-time
> > >> testing. However, more complex scenarios involving windows and timers
> > would
> > >> benefit from finer-grained control.
> > >> My suggestion would be to consider adding the following 2 methods :
> > >>
> > >>           a. *sendWatermarkAndAwaitIdle()*: A method that sends a
> > watermark
> > >> and then blocks until the framework can determine that all in-flight
> > >> records and timers triggered by that watermark have been fully
> > processed.
> > >> This would help users to add tests for windowed operations by removing
> > the
> > >> need for manual waits or sleeps.
> > >>
> > >>           b.* advanceProcessingTime(Duration duration)*: Method to
> > advance
> > >> the notion of processing time within the test environment. This will
> > help
> > >> add deterministic testing logic that depends on processing-time timers
> > and
> > >> windows.
> > >>
> > >>
> > >> Bests,
> > >> Samrat
> > >>
> > >> On Wed, Sep 17, 2025 at 1:48 AM Őrhidi Mátyás <
> [email protected]>
> > >> wrote:
> > >>
> > >>> Thanks Poorvank,
> > >>>
> > >>> The proposal looks great.
> > >>>
> > >>> Thanks,
> > >>> Matyas
> > >>>
> > >>> On Thu, Sep 11, 2025 at 3:44 AM Poorvank Bhatia <
> > [email protected]>
> > >>> wrote:
> > >>>
> > >>>> Hi all,
> > >>>> I’d like to propose a Pipeline Testing Framework (on Source/Sink V2)
> > >> as a
> > >>>> part of flink's testing framework, built on concepts presented by
> the
> > >>> talk
> > >>>> *“*Testing Production Streaming Applications
> > >>>> <https://www.youtube.com/watch?v=lXHleussX9Q>” (Gyula Fóra & Mátyás
> > >>>> Orhidi).
> > >>>>
> > >>>> The aim is to let users write end-to-end pipeline flow tests —
> > >> replacing
> > >>>> real connectors with lightweight TestSources/TestSinks — so they can
> > >>>> deterministically inject records/watermarks and assert outputs under
> > >>>> parallelism.
> > >>>>
> > >>>> Unlike MiniCluster, local env, or operator harnesses, this provides
> a
> > >>> small
> > >>>> Source/Sink V2-based *testkit* (send(), sendWatermark(), finish(),
> > >>>> assertSink()), backed by isolated in-JVM buffers for reproducible
> > >>> control.
> > >>>> The focus is on testing *pipeline logic and wiring*, not connector
> > >>>> implementations themselves.
> > >>>>
> > >>>> For more details refer to this doc
> > >>>> <
> > >>>>
> > >>
> >
> https://docs.google.com/document/d/1lwA4BE4vHTkIBQ-IyEBjPh2l7y7Z3MGzlbOlDaKDnzE/edit?tab=t.0
> > >>>> .
> > >>>> Looking forward to your feedback.
> > >>>> Thanks,
> > >>>> Poorvank Bhatia
> > >>>>
> >
>

Reply via email to