Makes sense. Thank you. On Fri, 17 Oct, 2025, 6:32 pm Ferenc Csaky, <[email protected]> wrote:
> Thank you! > > I believe the proposal is in a good shape now, so I would suggest that to > wait > by Monday EOD, and if there are no new inputs on the discussion thread, I > can > create a FLIP on the wiki page, and after that you can initiate a vote > thread. > WDYT? > > Best, > Ferenc > > > > > On Friday, October 17th, 2025 at 14:53, Poorvank Bhatia < > [email protected]> wrote: > > > > > > > Hey Ferenc Csaky, > > > > Thanks for the feedback—great catch. The intent was a fluent API that > > returns the same instance (“returns this”), but I agree a Builder makes > the > > design clearer and keeps the runner immutable. I have switched the doc > to a > > builder-style API, updated the examples. Thanks! > > > > > > > > > > On Fri, Oct 17, 2025 at 4:40 PM Ferenc Csaky [email protected] > > > > wrote: > > > > > Hi Poorvank, > > > > > > Thanks for starting this discussion, I also agree that such > functionality > > > would > > > be very useful, +1 for the idea. > > > > > > One question/suggestion. Based on the current class design it seems > every > > > `with...` method of `PipelineTestRunner` produces a new instance, which > > > does > > > not seem reasonable. I believe that the current design should utilize > the > > > builder pattern instead of a static factory method, and all `with...` > > > should be > > > moved into its builder. WDYT? > > > > > > Best, > > > Ferenc > > > > > > On Thursday, October 16th, 2025 at 06:48, Gyula Fora [email protected] > > > wrote: > > > > > > > Hey Poorvank! > > > > > > > > +1 from my side. I think this would be a nice addition to Flink. The > > > > built in testing utilities are more targeted towards single operator > > > > tests/completely black box testing. > > > > > > > > In our experience we have found pipeline testing utils exceedingly > > > > useful. > > > > > > > > Cheers > > > > Gyula > > > > > > > > On 2025/10/14 12:33:21 Poorvank Bhatia wrote: > > > > > > > > > Thanks Gustavo, > > > > > > > > > > Thanks for the review. > > > > > I have modified the doc and added a minimal SQL/Table example as > well. > > > > > > > > > > On Tue, Oct 14, 2025 at 6:03 PM Poorvank Bhatia > [email protected] > > > > > wrote: > > > > > > > > > > > Thanks Etienne, > > > > > > > > > > > > The pointers are super helpful. The proposal is very much in the > same > > > > > > spirit: > > > > > > > > > > > > 1. Beam TestPipeline ⇄ PipelineTestRunner (own the job lifecycle, > > > > > > wire > > > > > > sources/sinks, await completion). > > > > > > 2. Beam TestStream ⇄ ManualPipelineSource/Sink (inject elements + > > > > > > watermarks deterministically; finish stream). > > > > > > 3. Beam PAssert ⇄ SinkAssertions > (eventually/any-order/exact-order > > > > > > checks; we’re also adding side-output assertions). > > > > > > > > > > > > The key difference is that this is driven by Flink’s runtime. > > > > > > > > > > > > On Mon, Oct 13, 2025 at 8:04 PM Gustavo de Morais > > > > > > [email protected] > > > > > > wrote: > > > > > > > > > > > > > Hey Poorvank, > > > > > > > > > > > > > > Thanks for the proposal. It's nice that it's compatible with > both > > > > > > > Table & > > > > > > > DataStream API’s. Could you add a basic example with TableAPI > or > > > > > > > just > > > > > > > extend the current one showing how it'd look like? > > > > > > > > > > > > > > Kind regards, > > > > > > > Gustavo > > > > > > > > > > > > > > On Mon, 13 Oct 2025 at 16:14, Etienne Chauchot > > > > > > > [email protected] > > > > > > > wrote: > > > > > > > > > > > > > > > Hi Poorvank, > > > > > > > > > > > > > > > > Thanks for this proposition. I agree with the need and the > > > > > > > > general > > > > > > > > architecture. > > > > > > > > > > > > > > > > In Apache Beam project there was a similar test framework > that > > > > > > > > allowed > > > > > > > > to run a complete pipeline in test with control on watermark > > > > > > > > (set the > > > > > > > > watermark of the source), processing time (change clock > time), > > > > > > > > wait for > > > > > > > > pipeline finish and then do some assertions on the output > stream. > > > > > > > > > > > > > > > > Here are the pointers to the different classes to take some > > > > > > > > inspiration > > > > > > > > from: > > > > > > > > > > > > > > > > - > > > > > > > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java > > > > > > > > > > > - > > > > > > > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java > > > > > > > > > > > - > > > > > > > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java > > > > > > > > > > > Best > > > > > > > > > > > > > > > > Etienne > > > > > > > > > > > > > > > > Le 07/10/2025 à 05:57, Poorvank Bhatia a écrit : > > > > > > > > > > > > > > > > > Hi Samrat, > > > > > > > > > > > > > > > > > > Thanks for the review. Please find the responses inline > below. > > > > > > > > > > > > > > > > > > 1. For complex stateful applications, verifying only the > final > > > > > > > > > output > > > > > > > > > at > > > > > > > > > > > > > > > > > > > the sink may not be sufficient. Sometimes, there is a > need to > > > > > > > > > > validate > > > > > > > > > > the > > > > > > > > > > intermediate state being built up within operators to > ensure > > > > > > > > > > correctness. > > > > > > > > > > Is there a mechanism to inspect and verify the contents > of an > > > > > > > > > > operator's > > > > > > > > > > state? > > > > > > > > > > > > > > > > > > IMO For fine-grained, deterministic validation of > > > > > > > > > Value/List/Map/Broadcast > > > > > > > > > state and processing-time timers, the existing > > > > > > > > > Operator/Function test > > > > > > > > > harnesses are the right tool > > > > > > > > > > > > > > > > > > A common pattern in many of the Flink jobs is the use of > side > > > > > > > > > outputs > > > > > > > > > to > > > > > > > > > > > > > > > > > > > route different types of data to different downstream > > > > > > > > > > processes. The > > > > > > > > > > singular *assertSink() *method appears to target only the > > > > > > > > > > main data > > > > > > > > > > stream. > > > > > > > > > > Can you consider extending the interface to allow tests > to > > > > > > > > > > capture > > > > > > > > > > and > > > > > > > > > > assert data emitted to named side outputs? > > > > > > > > > > This could take the form of an assertion method like > > > > > > > > > > assertSideOutput(OutputTag<T> > > > > > > > > > > tag, T... expected), enabling comprehensive testing of > > > > > > > > > > pipelines > > > > > > > > > > with > > > > > > > > > > side > > > > > > > > > > output logic. WDUT? > > > > > > > > > > > > > > > > > > Agreed. Can add first-class support for side outputs by > > > > > > > > > registering OutputTag<T>s and collecting them via a named > > > > > > > > > buffer, > > > > > > > > > e.g.: > > > > > > > > > > > > > > > > > > - > > > > > > > > > > > > > > > > > > Runner: withSideOutput(OutputTag<T> tag[, String name]) > > > > > > > > > - > > > > > > > > > > > > > > > > > > Assertions: > > > > > > > > > assertSideOutput(tag).receivedExactlyInAnyOrder(...) > > > > > > > > > Implementation reuses the same shared sink buffer under a > > > > > > > > > namespaced > > > > > > > > > key > > > > > > > > > for each tag. > > > > > > > > > > > > > > > > > > a. sendWatermarkAndAwaitIdle(): A method that sends a > watermark > > > > > > > > > > > > > > > > > > > and then blocks until the framework can determine that > all > > > > > > > > > > in-flight > > > > > > > > > > records and timers triggered by that watermark have been > > > > > > > > > > fully > > > > > > > > > > processed. > > > > > > > > > > This would help users to add tests for windowed > operations by > > > > > > > > > > removing > > > > > > > > > > the > > > > > > > > > > need for manual waits or sleeps. > > > > > > > > > > > > > > > > > > Not sure what would be the right way to do that. The > runtime > > > > > > > > > doesn’t > > > > > > > > > expose > > > > > > > > > a reliable, global “quiescent after W” signal. Watermark > > > > > > > > > firings can > > > > > > > > > cascade, and in-flight work sits in mailboxes, and async > > > > > > > > > callbacks—so > > > > > > > > > "fully processed" is unverifiable end-to-end. > > > > > > > > > > > > > > > > > > > b.* advanceProcessingTime(Duration duration)*: Method to > > > > > > > > > > advance > > > > > > > > > > the notion of processing time within the test > environment. > > > > > > > > > > This will > > > > > > > > > > help > > > > > > > > > > add deterministic testing logic that depends on > > > > > > > > > > processing-time > > > > > > > > > > timers > > > > > > > > > > and > > > > > > > > > > windows. > > > > > > > > > > > > > > > > > > > > AFAIK, same as above i think the the Operator/Function > > > > > > > > > > harnesses > > > > > > > > > > already > > > > > > > > > > support deterministic processing-time control. > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > Poorvank Bhatia > > > > > > > > > > > > > > > > > > On Wed, Sep 24, 2025 at 11:59 AM Samrat Deb > > > > > > > > > [email protected] > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Thank you, Poorvank, for the proposal. This would be a > > > > > > > > > > valuable > > > > > > > > > > addition > > > > > > > > > > toward building reliable and well-tested Flink > applications. > > > > > > > > > > The > > > > > > > > > > proposal > > > > > > > > > > is already in excellent shape. > > > > > > > > > > > > > > > > > > > > Below are my take and suggestions on the proposal : > > > > > > > > > > > > > > > > > > > > 1. For complex stateful applications, verifying only the > > > > > > > > > > final > > > > > > > > > > output at > > > > > > > > > > the sink may not be sufficient. Sometimes, there is a > need to > > > > > > > > > > validate > > > > > > > > > > the > > > > > > > > > > intermediate state being built up within operators to > ensure > > > > > > > > > > correctness. > > > > > > > > > > Is there a mechanism to inspect and verify the contents > of an > > > > > > > > > > operator's > > > > > > > > > > state? > > > > > > > > > > > > > > > > > > > > 2. A common pattern in many of the Flink jobs is the use > of > > > > > > > > > > side > > > > > > > > > > outputs to > > > > > > > > > > route different types of data to different downstream > > > > > > > > > > processes. The > > > > > > > > > > singular *assertSink() *method appears to target only the > > > > > > > > > > main data > > > > > > > > > > stream. > > > > > > > > > > Can you consider extending the interface to allow tests > to > > > > > > > > > > capture > > > > > > > > > > and > > > > > > > > > > assert data emitted to named side outputs? > > > > > > > > > > This could take the form of an assertion method like > > > > > > > > > > assertSideOutput(OutputTag<T> > > > > > > > > > > tag, T... expected), enabling comprehensive testing of > > > > > > > > > > pipelines > > > > > > > > > > with > > > > > > > > > > side > > > > > > > > > > output logic. WDUT? > > > > > > > > > > > > > > > > > > > > 3. The sendWatermark() method is a solid foundation for > > > > > > > > > > event-time > > > > > > > > > > testing. However, more complex scenarios involving > windows > > > > > > > > > > and timers > > > > > > > > > > would > > > > > > > > > > benefit from finer-grained control. > > > > > > > > > > My suggestion would be to consider adding the following 2 > > > > > > > > > > methods : > > > > > > > > > > > > > > > > > > > > a. sendWatermarkAndAwaitIdle(): A method that sends a > > > > > > > > > > watermark > > > > > > > > > > and then blocks until the framework can determine that > all > > > > > > > > > > in-flight > > > > > > > > > > records and timers triggered by that watermark have been > > > > > > > > > > fully > > > > > > > > > > processed. > > > > > > > > > > This would help users to add tests for windowed > operations by > > > > > > > > > > removing > > > > > > > > > > the > > > > > > > > > > need for manual waits or sleeps. > > > > > > > > > > > > > > > > > > > > b.* advanceProcessingTime(Duration duration)*: Method to > > > > > > > > > > advance > > > > > > > > > > the notion of processing time within the test > environment. > > > > > > > > > > This will > > > > > > > > > > help > > > > > > > > > > add deterministic testing logic that depends on > > > > > > > > > > processing-time > > > > > > > > > > timers > > > > > > > > > > and > > > > > > > > > > windows. > > > > > > > > > > > > > > > > > > > > Bests, > > > > > > > > > > Samrat > > > > > > > > > > > > > > > > > > > > On Wed, Sep 17, 2025 at 1:48 AM Őrhidi Mátyás < > > > > > > > > > > [email protected]> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Thanks Poorvank, > > > > > > > > > > > > > > > > > > > > > > The proposal looks great. > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > Matyas > > > > > > > > > > > > > > > > > > > > > > On Thu, Sep 11, 2025 at 3:44 AM Poorvank Bhatia < > > > > > > > > > > > [email protected]> > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > I’d like to propose a Pipeline Testing Framework (on > > > > > > > > > > > > Source/Sink > > > > > > > > > > > > V2) > > > > > > > > > > > > as a > > > > > > > > > > > > part of flink's testing framework, built on concepts > > > > > > > > > > > > presented by > > > > > > > > > > > > the > > > > > > > > > > > > talk > > > > > > > > > > > > *“*Testing Production Streaming Applications > > > > > > > > > > > > https://www.youtube.com/watch?v=lXHleussX9Q” (Gyula > > > > > > > > > > > > Fóra & > > > > > > > > > > > > Mátyás > > > > > > > > > > > > Orhidi). > > > > > > > > > > > > > > > > > > > > > > > > The aim is to let users write end-to-end pipeline > flow > > > > > > > > > > > > tests — > > > > > > > > > > > > replacing > > > > > > > > > > > > real connectors with lightweight > TestSources/TestSinks — > > > > > > > > > > > > so they > > > > > > > > > > > > can > > > > > > > > > > > > deterministically inject records/watermarks and > assert > > > > > > > > > > > > outputs > > > > > > > > > > > > under > > > > > > > > > > > > parallelism. > > > > > > > > > > > > > > > > > > > > > > > > Unlike MiniCluster, local env, or operator harnesses, > > > > > > > > > > > > this > > > > > > > > > > > > provides a > > > > > > > > > > > > small > > > > > > > > > > > > Source/Sink V2-based testkit (send(), > sendWatermark(), > > > > > > > > > > > > finish(), > > > > > > > > > > > > assertSink()), backed by isolated in-JVM buffers for > > > > > > > > > > > > reproducible > > > > > > > > > > > > control. > > > > > > > > > > > > The focus is on testing pipeline logic and wiring, > not > > > > > > > > > > > > connector > > > > > > > > > > > > implementations themselves. > > > > > > > > > > > > > > > > > > > > > > > > For more details refer to this doc > > > > > > > > > > > > < > > > > > > > https://docs.google.com/document/d/1lwA4BE4vHTkIBQ-IyEBjPh2l7y7Z3MGzlbOlDaKDnzE/edit?tab=t.0 > > > > > > > > > > > > > > > . > > > > > > > > > > > > Looking forward to your feedback. > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Poorvank Bhatia >
