This is an automated email from the ASF dual-hosted git repository. MartijnVisser pushed a commit to branch release-2.3 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 39e80d6b32a3af7ee7f2c00953e3b716afe4a46e Author: Martijn Visser <[email protected]> AuthorDate: Fri Apr 17 10:20:48 2026 +0200 [FLINK-39477][docs] Add module-level AGENTS.md for table planner and runtime Extend the root guidance with module-local context for the two modules where AI agents most often need component-specific direction (ExecNode and rule layout, function base classes, plan serialization, harness and restore tests). Generated-by: Claude Code 2.1.112 (Claude Opus 4.7, 1M context) (cherry picked from commit bab97bb7542248a9dd113fa1c92c1174d203d882) --- flink-table/flink-table-planner/AGENTS.md | 117 ++++++++++++++++++++++++++++++ flink-table/flink-table-runtime/AGENTS.md | 78 ++++++++++++++++++++ 2 files changed, 195 insertions(+) diff --git a/flink-table/flink-table-planner/AGENTS.md b/flink-table/flink-table-planner/AGENTS.md new file mode 100644 index 00000000000..969c5b36656 --- /dev/null +++ b/flink-table/flink-table-planner/AGENTS.md @@ -0,0 +1,117 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# flink-table-planner + +Translates and optimizes SQL/Table API programs into executable plans using Apache Calcite. Bridges the Table/SQL API and the runtime by generating code and execution plans. The planner is loaded in a separate classloader (`flink-table-planner-loader`) to isolate Calcite dependencies. + +See also [README.md](README.md) for Immutables rule config conventions and JSON plan test regeneration. + +## Build Commands + +Full table modules rebuild: + +``` +./mvnw clean install -T1C -DskipTests -Pskip-webui-build -pl flink-table/flink-table-common,flink-table/flink-sql-parser,flink-table/flink-table-planner-loader,flink-table/flink-table-planner,flink-table/flink-table-api-java -am +``` + +After the first full build, drop `-am` for faster rebuilds when you're only changing code within these modules. + +## Key Directory Structure + +- `plan/rules/physical/stream/` and `plan/rules/physical/batch/` — Physical planner rules +- `plan/rules/logical/` — Logical optimization rules +- `plan/nodes/exec/stream/` and `plan/nodes/exec/batch/` — ExecNodes (bridge between planner and runtime) +- `plan/nodes/exec/spec/` — Serializable operator specifications (JoinSpec, WindowSpec, etc.) +- `plan/nodes/physical/stream/` and `plan/nodes/physical/batch/` — Intermediate physical nodes (Calcite-based) +- `plan/nodes/logical/` — Logical nodes (Calcite-based) +- `codegen/` — Code generation +- `codegen/calls/` — Custom code generators for specific functions (e.g., `JsonCallGen.scala`) +- `functions/casting/` — Cast rules for code generation (e.g., `BinaryToBinaryCastRule`, `StringToTimeCastRule`) +- `functions/` — Function management and inference +- `catalog/` — Catalog integration + +## Key Abstractions + +- **ExecNode**: Bridge between planner and runtime. Annotated with `@ExecNodeMetadata(name, version, minPlanVersion, minStateVersion)` for versioning and backwards compatibility. Extends `ExecNodeBase<T>` and implements either `StreamExecNode<T>` (streaming) or `BatchExecNode<T>` (batch); `T` is typically `RowData`. +- **Physical rules**: Extend `RelRule`, use Immutables `@Value.Immutable` for config. Transform logical nodes to physical nodes. Registered in `FlinkStreamRuleSets` and/or `FlinkBatchRuleSets`. +- **Logical optimization rules**: Also extend `RelRule`, often use `RexShuttle` for expression rewriting. Registered in rule sets. +- **Specs**: Serializable specifications in `plan/nodes/exec/spec/` (JoinSpec, WindowSpec, etc.) that carry operator configuration. + +## Common Change Patterns + +### Adding a new table operator + +Components involved (can be developed top-down or bottom-up): + +1. **Runtime operator** in `flink-table-runtime` under `operators/` (extend `TableStreamOperator`, implement `OneInputStreamOperator` or `TwoInputStreamOperator`). Test with harness tests. See [flink-table-runtime AGENTS.md](../flink-table-runtime/AGENTS.md). +2. **ExecNode** in `plan/nodes/exec/stream/` and/or `plan/nodes/exec/batch/` (extend `ExecNodeBase<T>`; implement `StreamExecNode<T>` for streaming or `BatchExecNode<T>` for batch; annotate with `@ExecNodeMetadata`; `T` is typically `RowData`) +3. **Physical Node + Physical Rules** in `plan/rules/physical/stream/` and/or `plan/rules/physical/batch/` (physical rules usually extend `ConverterRule` via `Config.INSTANCE.withConversion(...)`; same-convention rewrites extend `RelRule` with an `@Value.Immutable` config) +4. **Logical Node + Planner rule** +5. Tests: semantic tests, plan tests, restore tests (if stateful) + +Both `stream/` and `batch/` directories exist for rules and ExecNodes. Consider whether your change applies to one or both. + +### Adding a planner optimization rule + +Pick the base class by what the rule does: +- Converts a node from one calling convention to another (for example, logical → stream physical): extend `ConverterRule`. +Call `ConverterRule.Config.INSTANCE.withConversion(...)` in the constructor, do not define your own config. +- Rewrites nodes within the same convention (logical → logical, physical → physical): extend `RelRule` with an `@Value.Immutable` config. +Some existing rules still use Calcite's older `RelOptRule`; prefer `RelRule` for new code. + +Then: +1. Register in `FlinkStreamRuleSets.scala` and/or `FlinkBatchRuleSets.scala` +2. Plan tests with XML golden files — when the test fails, copy the framework's generated log file over the reference `.xml` (cases are ordered alphabetically by method name) +3. A same-convention rewrite needs no runtime changes. A `ConverterRule` that produces a new physical node also needs the physical node, ExecNode, and runtime operator — see "Adding a new table operator" above. + +### Extending SQL syntax + +1. Modify parser grammar in `flink-sql-parser` (`parserImpls.ftl`) +2. Add operation conversion logic in `SqlNodeToOperationConversion.java` +3. Test with parser tests and SQL gateway integration tests (`.q` files) + +### Code generation changes + +- Cast rules live in `functions/casting/`. Each extends `AbstractExpressionCodeGeneratorCastRule` or similar. +- Custom call generators for functions live in `codegen/calls/` (e.g., `JsonCallGen.scala`). Simple scalar functions typically don't need these; the planner handles them uniformly through the function definition. +- Immutables library is used for rule configs (`@Value.Immutable`, `@Value.Enclosing`). See [README.md](README.md). + +### Plan serialization changes + +- ExecNode specs use Jackson for JSON serialization. Source/sink specs should use `@JsonIgnoreProperties(ignoreUnknown = true)` for forward compatibility. +- When adding new ExecNode features, update `RexNodeJsonDeserializer` or related serde classes if new function kinds or types are introduced. + +### ExecNode versioning + +When bumping an ExecNode version, update the `@ExecNodeMetadata` annotation's `version` and `minPlanVersion`/`minStateVersion` fields. Add restore test snapshots for the new version. + +### Configuration options + +New features often introduce `ExecutionConfigOptions` entries (in `flink-table-api-java`) for runtime tunability (e.g., cache sizes, timeouts, batch sizes). + +## Testing Patterns + +Choose test types based on what you're changing: + +- **Semantic tests** (for ExecNode/operator changes): Use `SemanticTestBase` (streaming) or `BatchSemanticTestBase` (batch) in `plan/nodes/exec/testutils/`. Extends `CommonSemanticTestBase` which implements `TableTestProgramRunner`. Prefer these over ITCase for operators and ExecNodes. +- **Restore tests** (for stateful operators): Use `RestoreTestBase` or `BatchRestoreTestBase` in `plan/nodes/exec/testutils/`. Implements `TableTestProgramRunner`, uses `@ExtendWith(MiniClusterExtension.class)`. Required when your operator uses state. Tests savepoint creation and job restart in two phases: (1) generate compiled plans + savepoints, (2) verify recovery. +- **Plan tests** (for optimization rules): Verify the generated execution plan using XML golden files. Used for logical and physical optimization rules. +- **ITCase** (for built-in functions): Function tests typically use ITCase with `TestSetSpec` for end-to-end verification (e.g., `JsonFunctionsITCase`, `TimeFunctionsITCase`). +- **JSON plan test regeneration:** Set `PLAN_TEST_FORCE_OVERWRITE=true` environment variable (documented in [README.md](README.md)). diff --git a/flink-table/flink-table-runtime/AGENTS.md b/flink-table/flink-table-runtime/AGENTS.md new file mode 100644 index 00000000000..cd61620f20c --- /dev/null +++ b/flink-table/flink-table-runtime/AGENTS.md @@ -0,0 +1,78 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# flink-table-runtime + +Contains classes required by TaskManagers for execution of table programs. Implements runtime operators, built-in functions, and code generation support. Bundles janino (Java compiler for code generation) and flink-shaded-jsonpath. + +## Key Directory Structure + +- `functions/scalar/` — Scalar function implementations (47+) +- `functions/aggregate/` — Aggregate function implementations +- `functions/table/` — Table function implementations +- `functions/ptf/` — Process table function implementations +- `functions/ml/` — Machine learning function implementations +- `operators/` — Runtime operators organized by type: + - `join/` (hash, sort-merge, lookup, temporal, interval, delta, adaptive, stream/multi-join) + - `aggregate/` (group, window) + - `window/` (TVF windows, group windows) + - `deduplicate/`, `rank/`, `sort/` + - `sink/`, `source/` + - `correlate/` (including `async/` for async table functions) + - `calc/`, `match/`, `over/`, `process/`, `ml/`, `search/` + +## Common Change Patterns + +### Adding a built-in function + +Base classes by function type: + +- **Scalar:** Extend `BuiltInScalarFunction` in `functions/scalar/` +- **Table:** Extend `BuiltInTableFunction` in `functions/table/` +- **Aggregate:** Extend `BuiltInAggregateFunction` in `functions/aggregate/` +- **Process Table Function:** Extend `BuiltInProcessTableFunction` in `functions/ptf/` + +All are constructed from `BuiltInFunctionDefinition#specialize(SpecializedContext)` and work on internal data structures by default. + +Some functions also require custom code generators in the planner (e.g., `JsonCallGen.scala` for JSON functions). Simple scalar functions typically don't need planner changes; the planner handles them uniformly through the function definition. + +### Adding a runtime operator + +- **1 or 2 inputs:** Extend `TableStreamOperator<RowData>` (which extends `AbstractStreamOperator<OUT>`) and implement `OneInputStreamOperator<RowData, RowData>` or `TwoInputStreamOperator<RowData, RowData, RowData>` +- **3+ inputs:** Extend `AbstractStreamOperatorV2` and implement `MultipleInputStreamOperator` (see `StreamingMultiJoinOperator`) +- `TableStreamOperator` provides watermark tracking (`currentWatermark`), memory size computation, and a `ContextImpl` for timer services + +### Async operators and runners + +- **Key-ordered async execution:** `operators/join/lookup/keyordered/` contains async execution controller infrastructure (`AecRecord`, `Epoch`, `EpochManager`, `KeyAccountingUnit`, `RecordsBuffer`) for ordering guarantees in async lookup joins +- **Async correlate:** `operators/correlate/async/` for async table function support +- **Runner abstraction:** `AbstractFunctionRunner` and `AbstractAsyncFunctionRunner` provide base classes for code-generated function invocations (used by lookup join, ML predict, vector search runners) + +### State serializer migrations + +- When modifying state serializers, create a `TypeSerializerSnapshot` with version bumping +- Migration test resources follow naming: `migration-flink-<version>-<backend>-<variant>-snapshot` +- Rescaling tests verify state redistribution across parallelism changes (see `SinkUpsertMaterializerMigrationTest`, `SinkUpsertMaterializerRescalingTest`) + +## Testing Patterns + +- **Harness tests:** Use `OneInputStreamOperatorTestHarness<RowData, RowData>` with `RowDataHarnessAssertor` for output validation. See `operators/join/LookupJoinHarnessTest.java` as a reference. +- **Test utilities:** `StreamRecordUtils.insertRecord()` for test records, `RowDataHarnessAssertor` for assertions +- **Operator test base classes:** Module has dedicated base classes per operator type (e.g., `TemporalTimeJoinOperatorTestBase`, `Int2HashJoinOperatorTestBase`, `WindowAggOperatorTestBase`) +- **State migration tests:** Use snapshot files per Flink version and state backend type to verify forward/backward compatibility
