Hey Mika, Thanks for driving this! In general, this is useful for prototyping and in general to speed up testing. So +1. Some feedback from my side:
>For a PTF that reads from a table argument that hasn't been configured, I think it would return null, yes. I think it'd be a better dev UX if the harness throws during build "table argument 'purchases' is declared in eval() but was never configured. >I'm not entirely sure on the real/processing time considerations - my aim here was mostly around letting users validate timer behaviour, and timer registration/firing in PTFs is based on watermarks You've addressed event time and we need to detail in the flip what would be the approach for processing time. The API would ideally support similar methods as we have here https://github.com/confluentinc/flink/blob/a7a8dba2127ad719ca7932969b2934a0955e1bba/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java#L790-L804. If you want this to be out of scope, you can mention that explicitly in the FLIP. Also, comparing this to the current harness support for operators, here are some other relevant things that devs might need out of harness test you might want to include or mention that they are out of scope: - Side outputs: AbstractStreamOperatorTestHarness has getSideOutput(OutputTag<X>). - No parallelism config, Snapshot is opaque: The existing harness takes maxParallelism, parallelism, and subtaskIndex. Key group assignment (and therefore state distribution) depends on maxParallelism. AbstractStreamOperatorTestHarness.snapshot() returns OperatorSubtaskState, which you can pass to repartitionOperatorState() to simulate parallelism changes Kind regards, Gustavo On Thu, 12 Mar 2026 at 14:38, Mika Naylor <[email protected]> wrote: > Hey Martijn, > > Thank you for the detailed feedback! > > > 1. The FLIP has good examples of harness construction via the builder, > but > > doesn't address lifecycle management. For comparison, the existing > > DataStream test harnesses (documented at [1]) have explicit open() and > > implement AutoCloseable. Since PTFs can acquire resources in open(), the > > harness needs to manage this lifecycle. Could you clarify how > > open()/close() on the underlying PTF is handled? Is close() called > > automatically, or does the user need to trigger it? An end-to-end example > > showing cleanup would help. > > This is a good point! I must have missed the AutoClosable implementation > in the public interfaces section - but I did intend to add it! So it would > be closed automatically (but I suppose a user could also call close() > itself). For the open, I was planning on calling it during when the harness > is built using `Builder.build()`. > > > 2. The existing operator test harnesses support snapshot() and > > initializeState(OperatorSubtaskState) to simulate checkpoint/restore > > cycles. This is important for catching state serialization bugs, which > are > > a common source of production issues. The FLIP provides > withInitialState() > > for setup, but there's no way to take a snapshot mid-test and restore > into > > a fresh harness. Are we deliberately excluding this, or should we > consider > > adding it? > > I would absolutely consider adding this, thank you for pointing it out. I > think being able to take a `.snapshot()` from a harness and then initialise > a new harness via a `restore()` on the builder would make sense, as well as > maybe supporting `.restore()` on the harness itself after it has been built. > > > 3. Related to the above: PTFs with complex state (Map, List, POJO) can > > behave differently with heap vs. RocksDB backends due to serialization > > differences. The existing harnesses support setStateBackend(). Should the > > PTF test harness support this as well? At minimum, it would be good to > > document which backend is used by default. > > I had just intended to support heap backend and document this, but this is > a good point - supporting `setStateBackend()` makes sense here, similar to > the existing harnesses. I'll add this to the spec and document the default. > > > 4. withTableArgument(String tableName, List<Row> rows) is useful for > > testing join-like PTFs. The builder Javadoc describes when static rows > are > > passed and how table semantics (ROW_SEMANTIC vs SET_SEMANTIC) affect > > delivery, but a few things remain unclear: How is the schema for these > rows > > determined: is it inferred from the Row structure, or does it need to > match > > the eval() signature's type hints? And what happens if a PTF reads from a > > table argument that hasn't been configured via the builder: does it > receive > > null, or does the harness throw at build time? > > I wasn't quite sure what the right approach is here, I thought that > inferring it from the Row structure would work but it feels odd to ignore > the eval type hints. Perhaps I can try the Row structure approach, and it > feels unergonomic explore the second approach. > > For a PTF that reads from a table argument that hasnt been configured, I > think it would return null, yes. > > > > > - Timer.hasFired() is typed as boolean (primitive) but annotated > @Nullable. > > This looks like a bug -> should it be Boolean (boxed)? > > Oops, good catch. I'm not sure why I marked this as nullable, either a > timer has fired or it hasn't, im not sure returning null makes sense. Maybe > returning a non-nullable primitive is fine here. > > > - getOutputByKind(RowKind) implies that output preserves RowKind > metadata. > > Could you confirm that getOutput() also retains this? The generic <OUT> > > type parameter could use more specification on what's guaranteed. > > I would like getOutput to somehow retain this, but I'm not quite sure how > the return type could look like in this case. Perhaps `RowData`? I'm not > entirely sure if we have an interface that would cleanly capture this. > > > - Have you considered optional changelog consistency validation (e.g., > > verifying UPDATE_BEFORE precedes UPDATE_AFTER for the same key)? Could > be a > > useful debugging aid. > > I hadn't, no, but this is a useful idea. Could be togglable on the builder > with a `.withChangelogValidation` method. > > > - What's the error model when eval() or a timer callback throws? > Propagated > > directly, or wrapped? > > I would say propagated directly, unless you think wrapping them could be > useful here. > > > - The test plan mentions leveraging ProcessTableFunctionTestPrograms. > Could > > you clarify whether the harness will be validated against those > scenarios, > > or whether it's intended to replace them for certain use cases? > > I think just validated against them, as a way of making sure that the > harness covers the right set of features we want to capture. I don't think > it would replace them in this case. > > Thank you a ton for the feedback and ideas! I will update the FLIP > documentation based on them, it's very much appreciated. > > Kind regards, > Mika > > On Wed, 11 Mar 2026, at 6:02 PM, Martijn Visser wrote: > > Hey Mika, > > > > Thanks for putting this FLIP together. A dedicated test harness for PTFs > is > > a welcome addition. The builder-pattern API and the state/timer > > introspection features are well thought out. > > > > I have a few questions and suggestions after reviewing the FLIP: > > > > 1. The FLIP has good examples of harness construction via the builder, > but > > doesn't address lifecycle management. For comparison, the existing > > DataStream test harnesses (documented at [1]) have explicit open() and > > implement AutoCloseable. Since PTFs can acquire resources in open(), the > > harness needs to manage this lifecycle. Could you clarify how > > open()/close() on the underlying PTF is handled? Is close() called > > automatically, or does the user need to trigger it? An end-to-end example > > showing cleanup would help. > > > > 2. The existing operator test harnesses support snapshot() and > > initializeState(OperatorSubtaskState) to simulate checkpoint/restore > > cycles. This is important for catching state serialization bugs, which > are > > a common source of production issues. The FLIP provides > withInitialState() > > for setup, but there's no way to take a snapshot mid-test and restore > into > > a fresh harness. Are we deliberately excluding this, or should we > consider > > adding it? > > > > 3. Related to the above: PTFs with complex state (Map, List, POJO) can > > behave differently with heap vs. RocksDB backends due to serialization > > differences. The existing harnesses support setStateBackend(). Should the > > PTF test harness support this as well? At minimum, it would be good to > > document which backend is used by default. > > > > 4. withTableArgument(String tableName, List<Row> rows) is useful for > > testing join-like PTFs. The builder Javadoc describes when static rows > are > > passed and how table semantics (ROW_SEMANTIC vs SET_SEMANTIC) affect > > delivery, but a few things remain unclear: How is the schema for these > rows > > determined: is it inferred from the Row structure, or does it need to > match > > the eval() signature's type hints? And what happens if a PTF reads from a > > table argument that hasn't been configured via the builder: does it > receive > > null, or does the harness throw at build time? > > > > A few smaller points: > > > > - Timer.hasFired() is typed as boolean (primitive) but annotated > @Nullable. > > This looks like a bug -> should it be Boolean (boxed)? > > - getOutputByKind(RowKind) implies that output preserves RowKind > metadata. > > Could you confirm that getOutput() also retains this? The generic <OUT> > > type parameter could use more specification on what's guaranteed. > > - Have you considered optional changelog consistency validation (e.g., > > verifying UPDATE_BEFORE precedes UPDATE_AFTER for the same key)? Could > be a > > useful debugging aid. > > - What's the error model when eval() or a timer callback throws? > Propagated > > directly, or wrapped? > > - The test plan mentions leveraging ProcessTableFunctionTestPrograms. > Could > > you clarify whether the harness will be validated against those > scenarios, > > or whether it's intended to replace them for certain use cases? > > > > Overall I'm +1 on the direction. The core API design is clean and covers > > the main testing needs well. Addressing the lifecycle and > > checkpoint/restore gaps would bring it in line with what Flink users > > already have for DataStream UDF testing. > > > > Thanks, > > > > Martijn > > > > [1] > > > https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators > > > > On Fri, Mar 6, 2026 at 9:30 AM Mika Naylor <[email protected]> wrote: > > > > > Hey David, > > > > > > Yeah, I think in terms of scope I aim for more providing a framework > for > > > unit testing the behavior of custom PTFs. I'd like to include as much > > > validation as possible but there might be validation steps that aren't > > > possible to do without dipping into the engine side of things. > > > > > > I'm not entirely sure on the real/processing time considerations - my > aim > > > here was mostly around letting users validate timer behaviour, and > timer > > > registration/firing in PTFs is based on watermarks, if I read the doc > > > correctly. > > > > > > Kind regards, > > > Mika > > > > > > On Wed, 4 Mar 2026, at 10:38 AM, David Radley wrote: > > > > Hi Mika, > > > > This sounds like a good idea, in terms of scope, Is the idea that > this > > > is purely for unit tests or is this additionally proposed as > validation / > > > test harness for use when developing custom PTFs. > > > > I guess this allows us to create a common set of tests that all PTFs > > > need to pass using this harness. > > > > > > > > I would assume there are real (not event) time considerations for > some > > > PTFs, it would be worth mentioning how we should handle that. > > > > > > > > Kind regards, David. > > > > > > > > From: Mika Naylor <[email protected]> > > > > Date: Tuesday, 3 March 2026 at 16:46 > > > > To: [email protected] <[email protected]> > > > > Subject: [EXTERNAL] [DISCUSS] FLIP-567: Introduce a > ProcessTableFunction > > > Test Harness > > > > > > > > Hey everyone! > > > > > > > > I would like to kick off a discussion on FLIP-567: Introduce a > > > ProcessTableFunction Test Harness[1]. > > > > > > > > Currently, testing PTFs require full integration tests against a > running > > > Flink cluster. This FLIP would introduce a developer-friendly test > harness > > > for unit testing PTFs and would provide introspection to output, state, > > > timers, and watermarks for assertions and behaviour validation. This > would > > > let developers iterate and test their PTFs without needing to run a > > > fullscale integration test against a live Flink cluster. > > > > > > > > Would love any thoughts and feedback the community might have on this > > > proposal. > > > > > > > > Kind regards, > > > > Mika Naylor > > > > > > > > [1] > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-567%3A+Introduce+a+ProcessTableFunction+Test+Harness > > > > > > > > Unless otherwise stated above: > > > > > > > > IBM United Kingdom Limited > > > > Registered in England and Wales with number 741598 > > > > Registered office: Building C, IBM Hursley Office, Hursley Park Road, > > > Winchester, Hampshire SO21 2JN > > > > > > > > > >
