Hi Samrat,

Thanks for the review. Please find the responses inline below.

1. For complex stateful applications, verifying only the final output at
> the sink may not be sufficient. Sometimes, there is a need to validate the
> intermediate state being built up within operators to ensure correctness.
> Is there a mechanism to inspect and verify the contents of an operator's
> state?


 IMO For fine-grained, deterministic validation of Value/List/Map/Broadcast
state and processing-time timers, the existing Operator/Function test
harnesses are the right tool

A common pattern in many of the Flink jobs is the use of side outputs to
> route different types of data to different downstream processes. The
> singular *assertSink() *method appears to target only the main data stream.
> Can you consider extending the interface to allow tests to capture and
> assert data emitted to named side outputs?
> This could take the form of an assertion method like
> *assertSideOutput(OutputTag<T>
> tag, T... expected)*, enabling comprehensive testing of pipelines with side
> output logic. WDUT?


Agreed. Can add first-class support for side outputs by
registering OutputTag<T>s and collecting them via a named buffer, e.g.:

   -

   Runner: withSideOutput(OutputTag<T> tag[, String name])
   -

   Assertions: assertSideOutput(tag).receivedExactlyInAnyOrder(...)
   Implementation reuses the same shared sink buffer under a namespaced key
   for each tag.


  a. *sendWatermarkAndAwaitIdle()*: A method that sends a watermark
> and then blocks until the framework can determine that all in-flight
> records and timers triggered by that watermark have been fully processed.
> This would help users to add tests for windowed operations by removing the
> need for manual waits or sleeps.


Not sure what would be the right way to do that. The runtime doesn’t expose
a reliable, global “quiescent after W” signal. Watermark firings can
cascade, and in-flight work sits in mailboxes, and async callbacks—so
"fully processed" is unverifiable end-to-end.


>          b.* advanceProcessingTime(Duration duration)*: Method to advance
> the notion of processing time within the test environment. This will help
> add deterministic testing logic that depends on processing-time timers and
> windows.
>
> AFAIK, same as above i think the the Operator/Function harnesses already
support deterministic processing-time control.

Thanks,
Poorvank Bhatia

On Wed, Sep 24, 2025 at 11:59 AM Samrat Deb <[email protected]> wrote:

> Thank you, Poorvank, for the proposal. This would be a valuable addition
> toward building reliable and well-tested Flink applications. The proposal
> is already in excellent shape.
>
> Below are my take and suggestions on the proposal :
>
> 1. For complex stateful applications, verifying only the final output at
> the sink may not be sufficient. Sometimes, there is a need to validate the
> intermediate state being built up within operators to ensure correctness.
> Is there a mechanism to inspect and verify the contents of an operator's
> state?
>
> 2. A common pattern in many of the Flink jobs is the use of side outputs to
> route different types of data to different downstream processes. The
> singular *assertSink() *method appears to target only the main data stream.
> Can you consider extending the interface to allow tests to capture and
> assert data emitted to named side outputs?
> This could take the form of an assertion method like
> *assertSideOutput(OutputTag<T>
> tag, T... expected)*, enabling comprehensive testing of pipelines with side
> output logic. WDUT?
>
> 3. The *sendWatermark()* method is a solid foundation for event-time
> testing. However, more complex scenarios involving windows and timers would
> benefit from finer-grained control.
> My suggestion would be to consider adding the following 2 methods :
>
>          a. *sendWatermarkAndAwaitIdle()*: A method that sends a watermark
> and then blocks until the framework can determine that all in-flight
> records and timers triggered by that watermark have been fully processed.
> This would help users to add tests for windowed operations by removing the
> need for manual waits or sleeps.
>
>          b.* advanceProcessingTime(Duration duration)*: Method to advance
> the notion of processing time within the test environment. This will help
> add deterministic testing logic that depends on processing-time timers and
> windows.
>
>
> Bests,
> Samrat
>
> On Wed, Sep 17, 2025 at 1:48 AM Őrhidi Mátyás <[email protected]>
> wrote:
>
> > Thanks Poorvank,
> >
> > The proposal looks great.
> >
> > Thanks,
> > Matyas
> >
> > On Thu, Sep 11, 2025 at 3:44 AM Poorvank Bhatia <[email protected]>
> > wrote:
> >
> > > Hi all,
> > > I’d like to propose a Pipeline Testing Framework (on Source/Sink V2)
> as a
> > > part of flink's testing framework, built on concepts presented by the
> > talk
> > > *“*Testing Production Streaming Applications
> > > <https://www.youtube.com/watch?v=lXHleussX9Q>” (Gyula Fóra & Mátyás
> > > Orhidi).
> > >
> > > The aim is to let users write end-to-end pipeline flow tests —
> replacing
> > > real connectors with lightweight TestSources/TestSinks — so they can
> > > deterministically inject records/watermarks and assert outputs under
> > > parallelism.
> > >
> > > Unlike MiniCluster, local env, or operator harnesses, this provides a
> > small
> > > Source/Sink V2-based *testkit* (send(), sendWatermark(), finish(),
> > > assertSink()), backed by isolated in-JVM buffers for reproducible
> > control.
> > > The focus is on testing *pipeline logic and wiring*, not connector
> > > implementations themselves.
> > >
> > > For more details refer to this doc
> > > <
> > >
> >
> https://docs.google.com/document/d/1lwA4BE4vHTkIBQ-IyEBjPh2l7y7Z3MGzlbOlDaKDnzE/edit?tab=t.0
> > > >
> > > .
> > > Looking forward to your feedback.
> > > Thanks,
> > > Poorvank Bhatia
> > >
> >
>

Reply via email to