Hey Ferenc Csaky,

Thanks for the feedback—great catch. The intent was a fluent API that
returns the same instance (“returns this”), but I agree a Builder makes the
design clearer and keeps the runner immutable. I have switched the doc to a
builder-style API,  updated the examples. Thanks!




On Fri, Oct 17, 2025 at 4:40 PM Ferenc Csaky <[email protected]>
wrote:

> Hi Poorvank,
>
> Thanks for starting this discussion, I also agree that such functionality
> would
> be very useful, +1 for the idea.
>
> One question/suggestion. Based on the current class design it seems every
> `with...` method of `PipelineTestRunner` produces a new instance, which
> does
> not seem reasonable. I believe that the current design should utilize the
> builder pattern instead of a static factory method, and all `with...`
> should be
> moved into its builder. WDYT?
>
> Best,
> Ferenc
>
>
> On Thursday, October 16th, 2025 at 06:48, Gyula Fora <[email protected]>
> wrote:
>
> >
> >
> > Hey Poorvank!
> >
> > +1 from my side. I think this would be a nice addition to Flink. The
> built in testing utilities are more targeted towards single operator
> tests/completely black box testing.
> >
> > In our experience we have found pipeline testing utils exceedingly
> useful.
> >
> > Cheers
> > Gyula
> >
> > On 2025/10/14 12:33:21 Poorvank Bhatia wrote:
> >
> > > Thanks Gustavo,
> > >
> > > Thanks for the review.
> > > I have modified the doc and added a minimal SQL/Table example as well.
> > >
> > > On Tue, Oct 14, 2025 at 6:03 PM Poorvank Bhatia [email protected]
> > > wrote:
> > >
> > > > Thanks Etienne,
> > > >
> > > > The pointers are super helpful. The proposal is very much in the same
> > > > spirit:
> > > >
> > > > 1. Beam TestPipeline ⇄ PipelineTestRunner (own the job lifecycle,
> wire
> > > > sources/sinks, await completion).
> > > > 2. Beam TestStream ⇄ ManualPipelineSource/Sink (inject elements +
> > > > watermarks deterministically; finish stream).
> > > > 3. Beam PAssert ⇄ SinkAssertions (eventually/any-order/exact-order
> > > > checks; we’re also adding side-output assertions).
> > > >
> > > > The key difference is that this is driven by Flink’s runtime.
> > > >
> > > > On Mon, Oct 13, 2025 at 8:04 PM Gustavo de Morais
> [email protected]
> > > > wrote:
> > > >
> > > > > Hey Poorvank,
> > > > >
> > > > > Thanks for the proposal. It's nice that it's compatible with both
> Table &
> > > > > DataStream API’s. Could you add a basic example with TableAPI or
> just
> > > > > extend the current one showing how it'd look like?
> > > > >
> > > > > Kind regards,
> > > > > Gustavo
> > > > >
> > > > > On Mon, 13 Oct 2025 at 16:14, Etienne Chauchot
> [email protected]
> > > > > wrote:
> > > > >
> > > > > > Hi Poorvank,
> > > > > >
> > > > > > Thanks for this proposition. I agree with the need and the
> general
> > > > > > architecture.
> > > > > >
> > > > > > In Apache Beam project there was a similar test framework that
> allowed
> > > > > > to run a complete pipeline in test with control on watermark
> (set the
> > > > > > watermark of the source), processing time (change clock time),
> wait for
> > > > > > pipeline finish and then do some assertions on the output stream.
> > > > > >
> > > > > > Here are the pointers to the different classes to take some
> inspiration
> > > > > > from:
> > > > > >
> > > > > > -
> > > > >
> > > > >
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
> > > > >
> > > > > > -
> > > > >
> > > > >
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
> > > > >
> > > > > > -
> > > > >
> > > > >
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
> > > > >
> > > > > > Best
> > > > > >
> > > > > > Etienne
> > > > > >
> > > > > > Le 07/10/2025 à 05:57, Poorvank Bhatia a écrit :
> > > > > >
> > > > > > > Hi Samrat,
> > > > > > >
> > > > > > > Thanks for the review. Please find the responses inline below.
> > > > > > >
> > > > > > > 1. For complex stateful applications, verifying only the final
> output
> > > > > > > at
> > > > > > >
> > > > > > > > the sink may not be sufficient. Sometimes, there is a need to
> > > > > > > > validate
> > > > > > > > the
> > > > > > > > intermediate state being built up within operators to ensure
> > > > > > > > correctness.
> > > > > > > > Is there a mechanism to inspect and verify the contents of an
> > > > > > > > operator's
> > > > > > > > state?
> > > > > > >
> > > > > > > IMO For fine-grained, deterministic validation of
> > > > > > > Value/List/Map/Broadcast
> > > > > > > state and processing-time timers, the existing
> Operator/Function test
> > > > > > > harnesses are the right tool
> > > > > > >
> > > > > > > A common pattern in many of the Flink jobs is the use of side
> outputs
> > > > > > > to
> > > > > > >
> > > > > > > > route different types of data to different downstream
> processes. The
> > > > > > > > singular *assertSink() *method appears to target only the
> main data
> > > > > > > > stream.
> > > > > > > > Can you consider extending the interface to allow tests to
> capture
> > > > > > > > and
> > > > > > > > assert data emitted to named side outputs?
> > > > > > > > This could take the form of an assertion method like
> > > > > > > > assertSideOutput(OutputTag<T>
> > > > > > > > tag, T... expected), enabling comprehensive testing of
> pipelines
> > > > > > > > with
> > > > > > > > side
> > > > > > > > output logic. WDUT?
> > > > > > >
> > > > > > > Agreed. Can add first-class support for side outputs by
> > > > > > > registering OutputTag<T>s and collecting them via a named
> buffer,
> > > > > > > e.g.:
> > > > > > >
> > > > > > > -
> > > > > > >
> > > > > > > Runner: withSideOutput(OutputTag<T> tag[, String name])
> > > > > > > -
> > > > > > >
> > > > > > > Assertions:
> assertSideOutput(tag).receivedExactlyInAnyOrder(...)
> > > > > > > Implementation reuses the same shared sink buffer under a
> > > > > > > namespaced
> > > > > > > key
> > > > > > > for each tag.
> > > > > > >
> > > > > > > a. sendWatermarkAndAwaitIdle(): A method that sends a watermark
> > > > > > >
> > > > > > > > and then blocks until the framework can determine that all
> in-flight
> > > > > > > > records and timers triggered by that watermark have been
> fully
> > > > > > > > processed.
> > > > > > > > This would help users to add tests for windowed operations by
> > > > > > > > removing
> > > > > > > > the
> > > > > > > > need for manual waits or sleeps.
> > > > > > >
> > > > > > > Not sure what would be the right way to do that. The runtime
> doesn’t
> > > > > > > expose
> > > > > > > a reliable, global “quiescent after W” signal. Watermark
> firings can
> > > > > > > cascade, and in-flight work sits in mailboxes, and async
> callbacks—so
> > > > > > > "fully processed" is unverifiable end-to-end.
> > > > > > >
> > > > > > > > b.* advanceProcessingTime(Duration duration)*: Method to
> > > > > > > > advance
> > > > > > > > the notion of processing time within the test environment.
> This will
> > > > > > > > help
> > > > > > > > add deterministic testing logic that depends on
> processing-time
> > > > > > > > timers
> > > > > > > > and
> > > > > > > > windows.
> > > > > > > >
> > > > > > > > AFAIK, same as above i think the the Operator/Function
> harnesses
> > > > > > > > already
> > > > > > > > support deterministic processing-time control.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Poorvank Bhatia
> > > > > > >
> > > > > > > On Wed, Sep 24, 2025 at 11:59 AM Samrat Deb
> [email protected]
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Thank you, Poorvank, for the proposal. This would be a
> valuable
> > > > > > > > addition
> > > > > > > > toward building reliable and well-tested Flink applications.
> The
> > > > > > > > proposal
> > > > > > > > is already in excellent shape.
> > > > > > > >
> > > > > > > > Below are my take and suggestions on the proposal :
> > > > > > > >
> > > > > > > > 1. For complex stateful applications, verifying only the
> final
> > > > > > > > output at
> > > > > > > > the sink may not be sufficient. Sometimes, there is a need to
> > > > > > > > validate
> > > > > > > > the
> > > > > > > > intermediate state being built up within operators to ensure
> > > > > > > > correctness.
> > > > > > > > Is there a mechanism to inspect and verify the contents of an
> > > > > > > > operator's
> > > > > > > > state?
> > > > > > > >
> > > > > > > > 2. A common pattern in many of the Flink jobs is the use of
> side
> > > > > > > > outputs to
> > > > > > > > route different types of data to different downstream
> processes. The
> > > > > > > > singular *assertSink() *method appears to target only the
> main data
> > > > > > > > stream.
> > > > > > > > Can you consider extending the interface to allow tests to
> capture
> > > > > > > > and
> > > > > > > > assert data emitted to named side outputs?
> > > > > > > > This could take the form of an assertion method like
> > > > > > > > assertSideOutput(OutputTag<T>
> > > > > > > > tag, T... expected), enabling comprehensive testing of
> pipelines
> > > > > > > > with
> > > > > > > > side
> > > > > > > > output logic. WDUT?
> > > > > > > >
> > > > > > > > 3. The sendWatermark() method is a solid foundation for
> event-time
> > > > > > > > testing. However, more complex scenarios involving windows
> and timers
> > > > > > > > would
> > > > > > > > benefit from finer-grained control.
> > > > > > > > My suggestion would be to consider adding the following 2
> methods :
> > > > > > > >
> > > > > > > > a. sendWatermarkAndAwaitIdle(): A method that sends a
> > > > > > > > watermark
> > > > > > > > and then blocks until the framework can determine that all
> in-flight
> > > > > > > > records and timers triggered by that watermark have been
> fully
> > > > > > > > processed.
> > > > > > > > This would help users to add tests for windowed operations by
> > > > > > > > removing
> > > > > > > > the
> > > > > > > > need for manual waits or sleeps.
> > > > > > > >
> > > > > > > > b.* advanceProcessingTime(Duration duration)*: Method to
> > > > > > > > advance
> > > > > > > > the notion of processing time within the test environment.
> This will
> > > > > > > > help
> > > > > > > > add deterministic testing logic that depends on
> processing-time
> > > > > > > > timers
> > > > > > > > and
> > > > > > > > windows.
> > > > > > > >
> > > > > > > > Bests,
> > > > > > > > Samrat
> > > > > > > >
> > > > > > > > On Wed, Sep 17, 2025 at 1:48 AM Őrhidi Mátyás <
> > > > > > > > [email protected]>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Thanks Poorvank,
> > > > > > > > >
> > > > > > > > > The proposal looks great.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Matyas
> > > > > > > > >
> > > > > > > > > On Thu, Sep 11, 2025 at 3:44 AM Poorvank Bhatia <
> > > > > > > > > [email protected]>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi all,
> > > > > > > > > > I’d like to propose a Pipeline Testing Framework (on
> Source/Sink
> > > > > > > > > > V2)
> > > > > > > > > > as a
> > > > > > > > > > part of flink's testing framework, built on concepts
> presented by
> > > > > > > > > > the
> > > > > > > > > > talk
> > > > > > > > > > *“*Testing Production Streaming Applications
> > > > > > > > > > https://www.youtube.com/watch?v=lXHleussX9Q” (Gyula
> Fóra &
> > > > > > > > > > Mátyás
> > > > > > > > > > Orhidi).
> > > > > > > > > >
> > > > > > > > > > The aim is to let users write end-to-end pipeline flow
> tests —
> > > > > > > > > > replacing
> > > > > > > > > > real connectors with lightweight TestSources/TestSinks —
> so they
> > > > > > > > > > can
> > > > > > > > > > deterministically inject records/watermarks and assert
> outputs
> > > > > > > > > > under
> > > > > > > > > > parallelism.
> > > > > > > > > >
> > > > > > > > > > Unlike MiniCluster, local env, or operator harnesses,
> this
> > > > > > > > > > provides a
> > > > > > > > > > small
> > > > > > > > > > Source/Sink V2-based testkit (send(), sendWatermark(),
> finish(),
> > > > > > > > > > assertSink()), backed by isolated in-JVM buffers for
> reproducible
> > > > > > > > > > control.
> > > > > > > > > > The focus is on testing pipeline logic and wiring, not
> connector
> > > > > > > > > > implementations themselves.
> > > > > > > > > >
> > > > > > > > > > For more details refer to this doc
> > > > > > > > > > <
> > > > >
> > > > >
> https://docs.google.com/document/d/1lwA4BE4vHTkIBQ-IyEBjPh2l7y7Z3MGzlbOlDaKDnzE/edit?tab=t.0
> > > > >
> > > > > > > > > > .
> > > > > > > > > > Looking forward to your feedback.
> > > > > > > > > > Thanks,
> > > > > > > > > > Poorvank Bhatia
>

Reply via email to