This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch release-2.3 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2283c216d3694f268163c30bd1d2628b4270af58 Author: Ramin Gharib <[email protected]> AuthorDate: Mon Apr 20 10:19:55 2026 +0200 [FLINK-39261][table] Add FROM_CHANGELOG built-in process table function This closes #27901. (cherry picked from commit 4997ca40a3f757088be05f117612066cc3382a4f) --- .../docs/sql/reference/queries/changelog.md | 112 ++++++++- .../pyflink/table/tests/test_table_completeness.py | 1 + .../java/org/apache/flink/table/api/Table.java | 27 +++ .../apache/flink/table/api/internal/TableImpl.java | 5 + .../table/functions/BuiltInFunctionDefinition.java | 28 ++- .../functions/BuiltInFunctionDefinitions.java | 27 +++ .../table/functions/ChangelogModeStrategy.java | 37 +++ .../strategies/FromChangelogTypeStrategy.java | 253 +++++++++++++++++++++ .../strategies/SpecificInputTypeStrategies.java | 4 + .../strategies/SpecificTypeStrategies.java | 4 + .../ValidationOnlyInputTypeStrategy.java | 49 ++++ .../FromChangelogInputTypeStrategyTest.java | 167 ++++++++++++++ .../FlinkChangelogModeInferenceProgram.scala | 32 ++- .../exec/stream/FromChangelogSemanticTests.java | 49 ++++ .../exec/stream/FromChangelogTestPrograms.java | 210 +++++++++++++++++ .../exec/stream/ProcessTableFunctionTestUtils.java | 15 ++ .../planner/plan/stream/sql/FromChangelogTest.java | 77 +++++++ .../plan/stream/sql/ProcessTableFunctionTest.java | 14 +- .../planner/plan/stream/sql/FromChangelogTest.xml | 57 +++++ .../plan/stream/sql/ProcessTableFunctionTest.xml | 20 ++ .../functions/ptf/FromChangelogFunction.java | 147 ++++++++++++ 21 files changed, 1324 insertions(+), 11 deletions(-) diff --git a/docs/content/docs/sql/reference/queries/changelog.md b/docs/content/docs/sql/reference/queries/changelog.md index 61843ab5c2f..f4449299388 100644 --- a/docs/content/docs/sql/reference/queries/changelog.md +++ b/docs/content/docs/sql/reference/queries/changelog.md @@ -30,9 +30,119 @@ Flink SQL provides built-in process table functions (PTFs) for working with chan | Function | Description | |:---------|:------------| +| [FROM_CHANGELOG](#from_changelog) | Converts an append-only table with operation codes into a (potentially updating) dynamic table | | [TO_CHANGELOG](#to_changelog) | Converts a dynamic table into an append-only table with explicit operation codes | -<!-- Placeholder for future FROM_CHANGELOG function --> +## FROM_CHANGELOG + +The `FROM_CHANGELOG` PTF converts an append-only table with an explicit operation code column into a (potentially updating) dynamic table. Each input row is expected to have a string column that indicates the change operation. The operation column is interpreted by the engine and removed from the output. + +This is useful when consuming Change Data Capture (CDC) streams from systems like Debezium where events arrive as flat append-only records with an explicit operation field. It's also useful to be used in combination with the TO_CHANGELOG function, when converting the append-only table back into an updating table after doing some specific transformation to the events. + +Note: This version requires that your CDC data encodes updates using a full image (i.e. providing separate events for before and after the update). Please double-check whether your source provides both UPDATE_BEFORE and UPDATE_AFTER events. FROM_CHANGELOG is a very powerful function but might produce incorrect results in subsequent operations and tables, if not configured correctly. + +### Syntax + +```sql +SELECT * FROM FROM_CHANGELOG( + input => TABLE source_table, + [op => DESCRIPTOR(op_column_name),] + [op_mapping => MAP[ + 'c, r', 'INSERT', + 'ub', 'UPDATE_BEFORE', + 'ua', 'UPDATE_AFTER', + 'd', 'DELETE' + ]] +) +``` + +### Parameters + +| Parameter | Required | Description [...] +|:-------------|:---------|:-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...] +| `input` | Yes | The input table. Must be append-only. [...] +| `op` | No | A `DESCRIPTOR` with a single column name for the operation code column. Defaults to `op`. The column must exist in the input table and be of type STRING. [...] +| `op_mapping` | No | A `MAP<STRING, STRING>` mapping user-defined codes to Flink change operation names. Keys are user-defined codes (e.g., `'c'`, `'u'`, `'d'`), values are Flink change operation names (`INSERT`, `UPDATE_BEFORE`, `UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated codes to map multiple codes to the same operation (e.g., `'c, r'`). When provided, only mapped codes are forwarded - unmapped codes are dropped. Each change operation may appear at most once acro [...] + +#### Default op_mapping + +When `op_mapping` is omitted, the following standard names are used. They allow a reverse conversion from TO_CHANGELOG by default. + +| Input code | Change operation | +|:-------------------|:------------------| +| `'INSERT'` | INSERT | +| `'UPDATE_BEFORE'` | UPDATE_BEFORE | +| `'UPDATE_AFTER'` | UPDATE_AFTER | +| `'DELETE'` | DELETE | + +### Output Schema + +The output contains all input columns except the operation code (e.g., op) column, which is interpreted by Flink's SQL engine and removed. Each output row carries the appropriate change operation (INSERT, UPDATE_BEFORE, UPDATE_AFTER, or DELETE). + +``` +[all_input_columns_without_op] +``` + +### Examples + +#### Basic usage with standard op names + +```sql +-- Input (append-only): +-- +I[id:1, op:'INSERT', name:'Alice'] +-- +I[id:1, op:'UPDATE_BEFORE', name:'Alice'] +-- +I[id:1, op:'UPDATE_AFTER', name:'Alice2'] +-- +I[id:2, op:'DELETE', name:'Bob'] + +SELECT * FROM FROM_CHANGELOG( + input => TABLE cdc_stream +) + +-- Output (updating table): +-- +I[id:1, name:'Alice'] +-- -U[id:1, name:'Alice'] +-- +U[id:1, name:'Alice2'] +-- -D[id:2, name:'Bob'] + +-- Table state after all events: +-- | id | name | +-- |----|--------| +-- | 1 | Alice2 | +``` + +#### Custom operation column name + +```sql +-- Source schema: id INT, operation STRING, name STRING +SELECT * FROM FROM_CHANGELOG( + input => TABLE cdc_stream, + op => DESCRIPTOR(operation) +) +-- The operation column named 'operation' is used instead of 'op' +``` + +#### Table API + +```java +Table cdcStream = ...; + +// Default: reads 'op' column with standard change operation names +Table result = cdcStream.fromChangelog(); + +// With custom op column name +Table result = cdcStream.fromChangelog( + descriptor("operation").asArgument("op") +); + +// With custom op_mapping +Table result = cdcStream.fromChangelog( + descriptor("op").asArgument("op"), + map("c, r", "INSERT", + "ub", "UPDATE_BEFORE", + "ua", "UPDATE_AFTER", + "d", "DELETE").asArgument("op_mapping") +); +``` ## TO_CHANGELOG diff --git a/flink-python/pyflink/table/tests/test_table_completeness.py b/flink-python/pyflink/table/tests/test_table_completeness.py index feca7e63b1a..1f2b920474f 100644 --- a/flink-python/pyflink/table/tests/test_table_completeness.py +++ b/flink-python/pyflink/table/tests/test_table_completeness.py @@ -42,6 +42,7 @@ class TableAPICompletenessTests(PythonAPICompletenessTestCase, PyFlinkTestCase): 'asArgument', 'process', 'partitionBy', + 'fromChangelog', } @classmethod diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java index ef1785aecdd..172adfe1354 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java @@ -1454,4 +1454,31 @@ public interface Table extends Explainable<Table>, Executable { * @return an append-only {@link Table} with an {@code op} column prepended to the input columns */ Table toChangelog(Expression... arguments); + + /** + * Converts this append-only table with an explicit operation code column into a dynamic table + * using the built-in {@code FROM_CHANGELOG} process table function. + * + * <p>Each input row is expected to have a string operation code column (default: {@code "op"}) + * that indicates the change operation (e.g., INSERT, UPDATE_AFTER, UPDATE_BEFORE, DELETE). The + * output table is a dynamic table backed by a changelog stream. + * + * <p>Optional arguments can be passed using named expressions: + * + * <pre>{@code + * // Default: reads 'op' column with standard change operation names + * table.fromChangelog(); + * + * // Custom op column name and mapping (Debezium-style codes) + * table.fromChangelog( + * descriptor("__op").asArgument("op"), + * map("c, r", "INSERT", "u", "UPDATE_AFTER", "d", "DELETE").asArgument("op_mapping") + * ); + * }</pre> + * + * @param arguments optional named arguments for {@code op} and {@code op_mapping} + * @return a dynamic {@link Table} with the op column removed and proper change operation + * semantics + */ + Table fromChangelog(Expression... arguments); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java index f63d4032067..a539977532f 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java @@ -497,6 +497,11 @@ public class TableImpl implements Table { return new PartitionedTableImpl(this, Arrays.asList(fields)); } + @Override + public Table fromChangelog(Expression... arguments) { + return process(BuiltInFunctionDefinitions.FROM_CHANGELOG.getName(), (Object[]) arguments); + } + @Override public ApiExpression asArgument(String name) { return createArgumentExpression(operationTree, tableEnvironment, name); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java index 77cfaf71667..03e4780e018 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java @@ -72,6 +72,8 @@ public final class BuiltInFunctionDefinition implements SpecializedFunction { private final SqlCallSyntax sqlCallSyntax; + private final @Nullable ChangelogModeStrategy changelogModeStrategy; + private final String sqlName; private BuiltInFunctionDefinition( @@ -84,7 +86,8 @@ public final class BuiltInFunctionDefinition implements SpecializedFunction { boolean isDeterministic, boolean isRuntimeProvided, String runtimeClass, - boolean isInternal) { + boolean isInternal, + @Nullable ChangelogModeStrategy changelogModeStrategy) { this.name = checkNotNull(name, "Name must not be null."); this.sqlName = sqlName; this.version = isInternal ? null : version; @@ -95,6 +98,7 @@ public final class BuiltInFunctionDefinition implements SpecializedFunction { this.runtimeClass = runtimeClass; this.isInternal = isInternal; this.sqlCallSyntax = sqlCallSyntax; + this.changelogModeStrategy = changelogModeStrategy; validateFunction(this.name, this.version, this.isInternal); } @@ -131,6 +135,14 @@ public final class BuiltInFunctionDefinition implements SpecializedFunction { return isInternal; } + /** + * Returns the optional {@link ChangelogModeStrategy} for built-in PTFs that emit updates (e.g., + * FROM_CHANGELOG). The planner uses this to determine the output changelog mode. + */ + public Optional<ChangelogModeStrategy> getChangelogModeStrategy() { + return Optional.ofNullable(changelogModeStrategy); + } + public String getQualifiedName() { if (isInternal) { return name; @@ -253,6 +265,8 @@ public final class BuiltInFunctionDefinition implements SpecializedFunction { private SqlCallSyntax sqlCallSyntax = SqlCallSyntax.FUNCTION; + private @Nullable ChangelogModeStrategy changelogModeStrategy; + public Builder() { // default constructor to allow a fluent definition } @@ -399,6 +413,15 @@ public final class BuiltInFunctionDefinition implements SpecializedFunction { return this; } + /** + * Sets the {@link ChangelogModeStrategy} that determines the output changelog mode for this + * built-in PTF. Only needed for PTFs that emit updates (e.g., FROM_CHANGELOG). + */ + public Builder changelogModeStrategy(ChangelogModeStrategy changelogModeStrategy) { + this.changelogModeStrategy = changelogModeStrategy; + return this; + } + public BuiltInFunctionDefinition build() { return new BuiltInFunctionDefinition( name, @@ -410,7 +433,8 @@ public final class BuiltInFunctionDefinition implements SpecializedFunction { isDeterministic, isRuntimeProvided, runtimeClass, - isInternal); + isInternal, + changelogModeStrategy); } } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index 0f2f35a056a..0ee62f1116a 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -27,6 +27,7 @@ import org.apache.flink.table.api.JsonQueryWrapper; import org.apache.flink.table.api.JsonType; import org.apache.flink.table.api.JsonValueOnEmptyOrError; import org.apache.flink.table.api.TableException; +import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.expressions.TimeIntervalUnit; import org.apache.flink.table.expressions.TimePointUnit; import org.apache.flink.table.expressions.ValueLiteralExpression; @@ -106,6 +107,7 @@ import static org.apache.flink.table.types.inference.TypeStrategies.nullableIfAr import static org.apache.flink.table.types.inference.TypeStrategies.varyingString; import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.ARRAY_ELEMENT_ARG; import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.ARRAY_FULLY_COMPARABLE; +import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.FROM_CHANGELOG_INPUT_TYPE_STRATEGY; import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.INDEX; import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.JSON_ARGUMENT; import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.ML_PREDICT_INPUT_TYPE_STRATEGY; @@ -115,6 +117,7 @@ import static org.apache.flink.table.types.inference.strategies.SpecificInputTyp import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.percentage; import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.percentageArray; import static org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.ARRAY_APPEND_PREPEND; +import static org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.FROM_CHANGELOG_OUTPUT_TYPE_STRATEGY; import static org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.ML_PREDICT_OUTPUT_TYPE_STRATEGY; import static org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.TO_CHANGELOG_OUTPUT_TYPE_STRATEGY; @@ -809,6 +812,30 @@ public final class BuiltInFunctionDefinitions { "org.apache.flink.table.runtime.functions.ptf.ToChangelogFunction") .build(); + public static final BuiltInFunctionDefinition FROM_CHANGELOG = + BuiltInFunctionDefinition.newBuilder() + .name("FROM_CHANGELOG") + .kind(PROCESS_TABLE) + .staticArguments( + StaticArgument.table( + "input", + Row.class, + false, + EnumSet.of( + StaticArgumentTrait.TABLE, + StaticArgumentTrait.ROW_SEMANTIC_TABLE)), + StaticArgument.scalar("op", DataTypes.DESCRIPTOR(), true), + StaticArgument.scalar( + "op_mapping", + DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()), + true)) + .changelogModeStrategy(ctx -> ChangelogMode.all()) + .inputTypeStrategy(FROM_CHANGELOG_INPUT_TYPE_STRATEGY) + .outputTypeStrategy(FROM_CHANGELOG_OUTPUT_TYPE_STRATEGY) + .runtimeClass( + "org.apache.flink.table.runtime.functions.ptf.FromChangelogFunction") + .build(); + public static final BuiltInFunctionDefinition GREATEST = BuiltInFunctionDefinition.newBuilder() .name("GREATEST") diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ChangelogModeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ChangelogModeStrategy.java new file mode 100644 index 00000000000..53a561c74a4 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ChangelogModeStrategy.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.functions.ChangelogFunction.ChangelogContext; +import org.apache.flink.table.types.inference.TypeStrategy; + +/** + * Strategy for determining the output {@link ChangelogMode} of a built-in process table function. + * + * <p>Similar to {@link TypeStrategy}, this is used to declare changelog semantics in the function + * definition rather than implementing the {@link ChangelogFunction} interface. + */ +@Internal +public interface ChangelogModeStrategy { + + /** Infers the output {@link ChangelogMode} based on the given {@link ChangelogContext}. */ + ChangelogMode inferChangelogMode(ChangelogContext changelogContext); +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java new file mode 100644 index 00000000000..4d7b25fbd60 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.DataTypes.Field; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.functions.TableSemantics; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.CallContext; +import org.apache.flink.table.types.inference.InputTypeStrategy; +import org.apache.flink.table.types.inference.TypeStrategy; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.types.ColumnList; +import org.apache.flink.types.RowKind; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** Type strategies for the {@code FROM_CHANGELOG} process table function. */ +@Internal +public final class FromChangelogTypeStrategy { + + private static final String DEFAULT_OP_COLUMN_NAME = "op"; + + private static final Set<String> VALID_ROW_KIND_NAMES = + Set.of("INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE"); + + // -------------------------------------------------------------------------------------------- + // Input validation + // -------------------------------------------------------------------------------------------- + + public static final InputTypeStrategy INPUT_TYPE_STRATEGY = + new ValidationOnlyInputTypeStrategy() { + @Override + public Optional<List<DataType>> inferInputTypes( + final CallContext callContext, final boolean throwOnFailure) { + return validateInputs(callContext, throwOnFailure); + } + }; + + // -------------------------------------------------------------------------------------------- + // Output type inference + // -------------------------------------------------------------------------------------------- + + public static final TypeStrategy OUTPUT_TYPE_STRATEGY = + callContext -> { + final TableSemantics tableSemantics = + callContext + .getTableSemantics(0) + .orElseThrow( + () -> + new ValidationException( + "First argument must be a table for FROM_CHANGELOG.")); + + final String opColumnName = resolveOpColumnName(callContext); + + final List<Field> outputFields = buildOutputFields(tableSemantics, opColumnName); + + return Optional.of(DataTypes.ROW(outputFields).notNull()); + }; + private static final String UPDATE_BEFORE = RowKind.UPDATE_BEFORE.name(); + private static final String UPDATE_AFTER = RowKind.UPDATE_AFTER.name(); + + // -------------------------------------------------------------------------------------------- + // Helpers + // -------------------------------------------------------------------------------------------- + + @SuppressWarnings("rawtypes") + private static Optional<List<DataType>> validateInputs( + final CallContext callContext, final boolean throwOnFailure) { + Optional<List<DataType>> error; + + error = validateTableArg(callContext, throwOnFailure); + if (error.isPresent()) { + return error; + } + + error = validateOpDescriptor(callContext, throwOnFailure); + if (error.isPresent()) { + return error; + } + + error = validateOpColumn(callContext, throwOnFailure); + if (error.isPresent()) { + return error; + } + + error = validateOpMapping(callContext, throwOnFailure); + if (error.isPresent()) { + return error; + } + + return Optional.of(callContext.getArgumentDataTypes()); + } + + private static Optional<List<DataType>> validateTableArg( + final CallContext callContext, final boolean throwOnFailure) { + if (callContext.getTableSemantics(0).isEmpty()) { + return callContext.fail( + throwOnFailure, "First argument must be a table for FROM_CHANGELOG."); + } + return Optional.empty(); + } + + private static Optional<List<DataType>> validateOpDescriptor( + final CallContext callContext, final boolean throwOnFailure) { + final Optional<ColumnList> opDescriptor = callContext.getArgumentValue(1, ColumnList.class); + if (opDescriptor.isPresent() && opDescriptor.get().getNames().size() != 1) { + return callContext.fail( + throwOnFailure, + "The descriptor for argument 'op' must contain exactly one column name."); + } + return Optional.empty(); + } + + /** Validates that the op column exists in the input schema and is of STRING type. */ + private static Optional<List<DataType>> validateOpColumn( + final CallContext callContext, final boolean throwOnFailure) { + + final TableSemantics tableSemantics = callContext.getTableSemantics(0).get(); + final String opColumnName = resolveOpColumnName(callContext); + final List<Field> inputFields = DataType.getFields(tableSemantics.dataType()); + final Optional<Field> opField = + inputFields.stream().filter(f -> f.getName().equals(opColumnName)).findFirst(); + if (opField.isEmpty()) { + return callContext.fail( + throwOnFailure, + String.format( + "The op column '%s' does not exist in the input schema.", + opColumnName)); + } + if (!opField.get().getDataType().getLogicalType().is(LogicalTypeFamily.CHARACTER_STRING)) { + return callContext.fail( + throwOnFailure, + String.format( + "The op column '%s' must be of STRING type, but was '%s'.", + opColumnName, opField.get().getDataType().getLogicalType())); + } + return Optional.empty(); + } + + /** Validates op_mapping is a literal and its values are valid change operation names. */ + @SuppressWarnings("unchecked") + private static Optional<List<DataType>> validateOpMapping( + final CallContext callContext, final boolean throwOnFailure) { + final boolean hasMappingArgProvided = !callContext.isArgumentNull(2); + final boolean isMappingArgLiteral = callContext.isArgumentLiteral(2); + if (hasMappingArgProvided && !isMappingArgLiteral) { + return callContext.fail( + throwOnFailure, "The 'op_mapping' argument must be a constant MAP literal."); + } + + final Optional<Map> opMapping = callContext.getArgumentValue(2, Map.class); + if (opMapping.isPresent()) { + final Map<String, String> mapping = opMapping.get(); + final Optional<List<DataType>> validationError = + validateOpMappingValues(callContext, mapping, throwOnFailure); + if (validationError.isPresent()) { + return validationError; + } + + final boolean hasUpdateBefore = + mapping.values().stream().anyMatch(v -> UPDATE_BEFORE.equals(v.trim())); + final boolean hasUpdateAfter = + mapping.values().stream().anyMatch(v -> UPDATE_AFTER.equals(v.trim())); + if (hasUpdateAfter && !hasUpdateBefore) { + return callContext.fail( + throwOnFailure, + "The 'op_mapping' must include UPDATE_BEFORE for retract mode. " + + "Upsert mode (without UPDATE_BEFORE) is not supported " + + "in this version."); + } + } + return Optional.empty(); + } + + /** + * Validates op_mapping values. Values must be valid Flink change operation names. Each name + * must appear at most once across all entries. + */ + private static Optional<List<DataType>> validateOpMappingValues( + final CallContext callContext, + final Map<String, String> opMapping, + final boolean throwOnFailure) { + final Set<String> allRowKindsSeen = new HashSet<>(); + + for (final String value : opMapping.values()) { + final String rowKindName = value.trim(); + if (!VALID_ROW_KIND_NAMES.contains(rowKindName)) { + return callContext.fail( + throwOnFailure, + String.format( + "Invalid target mapping for argument 'op_mapping'. " + + "Unknown change operation: '%s'. Valid values are: %s.", + rowKindName, VALID_ROW_KIND_NAMES)); + } + final boolean isDuplicate = !allRowKindsSeen.add(rowKindName); + if (isDuplicate) { + return callContext.fail( + throwOnFailure, + String.format( + "Invalid target mapping for argument 'op_mapping'. " + + "Duplicate change operation: '%s'. " + + "Use comma-separated keys to map multiple codes to the same operation " + + "(e.g., MAP['c, r', 'INSERT']).", + rowKindName)); + } + } + return Optional.empty(); + } + + private static String resolveOpColumnName(final CallContext callContext) { + return callContext + .getArgumentValue(1, ColumnList.class) + .filter(cl -> !cl.getNames().isEmpty()) + .map(cl -> cl.getNames().get(0)) + .orElse(DEFAULT_OP_COLUMN_NAME); + } + + private static List<Field> buildOutputFields( + final TableSemantics tableSemantics, final String opColumnName) { + final List<Field> inputFields = DataType.getFields(tableSemantics.dataType()); + + // Exclude the op column (becomes RowKind), keep all other columns + return inputFields.stream() + .filter(f -> !f.getName().equals(opColumnName)) + .collect(Collectors.toList()); + } + + private FromChangelogTypeStrategy() {} +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java index 4455b083ce2..b4cd96e0349 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java @@ -126,6 +126,10 @@ public final class SpecificInputTypeStrategies { public static final InputTypeStrategy TO_CHANGELOG_INPUT_TYPE_STRATEGY = ToChangelogTypeStrategy.INPUT_TYPE_STRATEGY; + /** Input strategy for {@link BuiltInFunctionDefinitions#FROM_CHANGELOG}. */ + public static final InputTypeStrategy FROM_CHANGELOG_INPUT_TYPE_STRATEGY = + FromChangelogTypeStrategy.INPUT_TYPE_STRATEGY; + /** See {@link ExtractInputTypeStrategy}. */ public static final InputTypeStrategy EXTRACT = new ExtractInputTypeStrategy(); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java index c1b854cad7c..06b6cbb2d3b 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java @@ -202,6 +202,10 @@ public final class SpecificTypeStrategies { public static final TypeStrategy TO_CHANGELOG_OUTPUT_TYPE_STRATEGY = ToChangelogTypeStrategy.OUTPUT_TYPE_STRATEGY; + /** Type strategy specific for {@link BuiltInFunctionDefinitions#FROM_CHANGELOG}. */ + public static final TypeStrategy FROM_CHANGELOG_OUTPUT_TYPE_STRATEGY = + FromChangelogTypeStrategy.OUTPUT_TYPE_STRATEGY; + private SpecificTypeStrategies() { // no instantiation } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ValidationOnlyInputTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ValidationOnlyInputTypeStrategy.java new file mode 100644 index 00000000000..0744076c164 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ValidationOnlyInputTypeStrategy.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.types.inference.ArgumentCount; +import org.apache.flink.table.types.inference.ConstantArgumentCount; +import org.apache.flink.table.types.inference.InputTypeStrategy; +import org.apache.flink.table.types.inference.Signature; +import org.apache.flink.table.types.inference.StaticArgument; + +import java.util.List; + +/** + * Base class for input type strategies that only perform validation. Argument count and signatures + * are handled by {@link StaticArgument}s in the function definition. + * + * <p>Subclasses only need to implement {@link #inferInputTypes} for custom validation logic. + */ +@Internal +public abstract class ValidationOnlyInputTypeStrategy implements InputTypeStrategy { + + @Override + public ArgumentCount getArgumentCount() { + return ConstantArgumentCount.any(); + } + + @Override + public List<Signature> getExpectedSignatures(FunctionDefinition definition) { + return List.of(Signature.of()); + } +} diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java new file mode 100644 index 00000000000..c059598df66 --- /dev/null +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.InputTypeStrategiesTestBase; +import org.apache.flink.table.types.inference.utils.TableSemanticsMock; +import org.apache.flink.types.ColumnList; + +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.FROM_CHANGELOG_INPUT_TYPE_STRATEGY; + +/** Tests for {@link FromChangelogTypeStrategy#INPUT_TYPE_STRATEGY}. */ +class FromChangelogInputTypeStrategyTest extends InputTypeStrategiesTestBase { + + private static final DataType TABLE_TYPE = + DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.INT()), + DataTypes.FIELD("op", DataTypes.STRING()), + DataTypes.FIELD("name", DataTypes.STRING())); + + private static final DataType DESCRIPTOR_TYPE = DataTypes.DESCRIPTOR(); + + private static final DataType MAP_TYPE = DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()); + + @Override + protected Stream<TestSpec> testData() { + return Stream.of( + // Valid: custom mapping with all change operations + TestSpec.forStrategy( + "Valid with custom mapping", FROM_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE) + .calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(1, ColumnList.of(List.of("op"))) + .calledWithLiteralAt( + 2, + Map.of( + "c", "INSERT", + "ub", "UPDATE_BEFORE", + "ua", "UPDATE_AFTER", + "d", "DELETE")) + .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE), + + // Valid: retract-style mapping with UPDATE_BEFORE + TestSpec.forStrategy("Valid with UPDATE_BEFORE", FROM_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE) + .calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(1, ColumnList.of(List.of("op"))) + .calledWithLiteralAt( + 2, + Map.of( + "c", "INSERT", + "ub", "UPDATE_BEFORE", + "ua", "UPDATE_AFTER", + "d", "DELETE")) + .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE), + + // Error: op column not found + TestSpec.forStrategy( + "Op column not found in schema", FROM_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE) + .calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(1, ColumnList.of(List.of("nonexistent"))) + .expectErrorMessage("The op column 'nonexistent' does not exist"), + + // Error: op column is not STRING + TestSpec.forStrategy("Op column wrong type", FROM_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes( + DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.INT()), + DataTypes.FIELD("op", DataTypes.INT()), + DataTypes.FIELD("name", DataTypes.STRING())), + DESCRIPTOR_TYPE) + .calledWithTableSemanticsAt( + 0, + new TableSemanticsMock( + DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.INT()), + DataTypes.FIELD("op", DataTypes.INT()), + DataTypes.FIELD("name", DataTypes.STRING())))) + .calledWithLiteralAt(1, ColumnList.of(List.of("op"))) + .expectErrorMessage("must be of STRING type"), + + // Error: multi-column descriptor + TestSpec.forStrategy( + "Descriptor with multiple columns", + FROM_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE) + .calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(1, ColumnList.of(List.of("a", "b"))) + .expectErrorMessage("must contain exactly one column name"), + + // Error: invalid RowKind in op_mapping value + TestSpec.forStrategy( + "Invalid RowKind in mapping value", + FROM_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE) + .calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(1, ColumnList.of(List.of("op"))) + .calledWithLiteralAt(2, Map.of("c", "INVALID_KIND")) + .expectErrorMessage("Unknown change operation: 'INVALID_KIND'"), + + // Error: duplicate RowKind across entries + TestSpec.forStrategy( + "Duplicate RowKind in mapping values", + FROM_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE) + .calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(1, ColumnList.of(List.of("op"))) + .calledWithLiteralAt(2, Map.of("c", "INSERT", "r", "INSERT")) + .expectErrorMessage("Duplicate change operation: 'INSERT'"), + + // Valid: INSERT-only mapping (append mode, no updates) + TestSpec.forStrategy( + "Valid INSERT-only mapping", FROM_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE) + .calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(1, ColumnList.of(List.of("op"))) + .calledWithLiteralAt(2, Map.of("c, r", "INSERT")) + .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE), + + // Valid: INSERT + DELETE mapping (no updates) + TestSpec.forStrategy( + "Valid INSERT and DELETE mapping", + FROM_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE) + .calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(1, ColumnList.of(List.of("op"))) + .calledWithLiteralAt(2, Map.of("c", "INSERT", "d", "DELETE")) + .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE), + + // Error: UPDATE_AFTER without UPDATE_BEFORE not supported + TestSpec.forStrategy( + "UPDATE_AFTER requires UPDATE_BEFORE", + FROM_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE) + .calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(1, ColumnList.of(List.of("op"))) + .calledWithLiteralAt( + 2, + Map.of( + "c", "INSERT", + "u", "UPDATE_AFTER", + "d", "DELETE")) + .expectErrorMessage("must include UPDATE_BEFORE")); + } +} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala index 56c9cb1262a..b6767e68ac6 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala @@ -23,7 +23,7 @@ import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior import org.apache.flink.table.api.config.ExecutionConfigOptions import org.apache.flink.table.api.config.ExecutionConfigOptions.UpsertMaterialize import org.apache.flink.table.connector.ChangelogMode -import org.apache.flink.table.functions.ChangelogFunction +import org.apache.flink.table.functions.{BuiltInFunctionDefinition, ChangelogFunction} import org.apache.flink.table.functions.ChangelogFunction.ChangelogContext import org.apache.flink.table.planner.calcite.{FlinkTypeFactory, RexTableArgCall} import org.apache.flink.table.planner.plan.`trait`._ @@ -1711,16 +1711,36 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti val changelogContext = toPtfChangelogContext(process, inputChangelogModes, requiredChangelogMode) val changelogMode = changelogFunction.getChangelogMode(changelogContext) - if (!changelogMode.containsOnly(RowKind.INSERT)) { - verifyPtfTableArgsForUpdates(call) - } + verifyPtfTableArgsForUpdates(call, changelogMode) + toTraitSet(changelogMode) + case builtIn: BuiltInFunctionDefinition if builtIn.getChangelogModeStrategy.isPresent => + val inputChangelogModes = children.map(toChangelogMode(_, None, None)) + val changelogContext = + toPtfChangelogContext(process, inputChangelogModes, requiredChangelogMode) + val changelogMode = + builtIn.getChangelogModeStrategy.get().inferChangelogMode(changelogContext) + verifyPtfTableArgsForUpdates(call, changelogMode) toTraitSet(changelogMode) case _ => defaultTraitSet } } - private def verifyPtfTableArgsForUpdates(call: RexCall): Unit = { + /** + * Verifies that PTFs with upsert output (without UPDATE_BEFORE) use set semantics. + * + * Retract mode (with UPDATE_BEFORE) is self-describing — each update carries either the old and + * new value, so downstream can process it without a key. Row semantics is safe. + * + * Upsert mode (without UPDATE_BEFORE) requires a key to look up previous values, so set semantics + * with PARTITION BY is required. + */ + private def verifyPtfTableArgsForUpdates(call: RexCall, changelogMode: ChangelogMode): Unit = { + if ( + changelogMode.containsOnly(RowKind.INSERT) || changelogMode.contains(RowKind.UPDATE_BEFORE) + ) { + return + } StreamPhysicalProcessTableFunction .getProvidedInputArgs(call) .map(_.e) @@ -1728,7 +1748,7 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti tableArg => if (tableArg.is(StaticArgumentTrait.ROW_SEMANTIC_TABLE)) { throw new ValidationException( - s"PTFs that take table arguments with row semantics don't support updating output. " + + s"PTFs that take table arguments with row semantics don't support upsert output. " + s"Table argument '${tableArg.getName}' of function '${call.getOperator.toString}' " + s"must use set semantics.") } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java new file mode 100644 index 00000000000..022a8d75450 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.planner.plan.nodes.exec.testutils.SemanticTestBase; +import org.apache.flink.table.test.program.TableTestProgram; + +import java.util.List; + +/** Semantic tests for the built-in FROM_CHANGELOG process table function. */ +public class FromChangelogSemanticTests extends SemanticTestBase { + + @Override + protected void applyDefaultEnvironmentOptions(TableConfig config) { + super.applyDefaultEnvironmentOptions(config); + config.set( + OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY, + OptimizerConfigOptions.NonDeterministicUpdateStrategy.IGNORE); + } + + @Override + public List<TableTestProgram> programs() { + return List.of( + FromChangelogTestPrograms.DEFAULT_OP_MAPPING, + FromChangelogTestPrograms.CUSTOM_OP_MAPPING, + FromChangelogTestPrograms.UNMAPPED_CODES_DROPPED, + FromChangelogTestPrograms.CUSTOM_OP_NAME, + FromChangelogTestPrograms.TABLE_API_DEFAULT, + FromChangelogTestPrograms.ROUND_TRIP); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java new file mode 100644 index 00000000000..d5554b12124 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; + +/** {@link TableTestProgram} definitions for testing the built-in FROM_CHANGELOG PTF. */ +public class FromChangelogTestPrograms { + + private static final String[] SIMPLE_CDC_SCHEMA = {"id INT", "op STRING", "name STRING"}; + + // -------------------------------------------------------------------------------------------- + // SQL tests + // -------------------------------------------------------------------------------------------- + + public static final TableTestProgram DEFAULT_OP_MAPPING = + TableTestProgram.of( + "from-changelog-default-op-mapping", + "default mapping with standard op names") + .setupTableSource( + SourceTestStep.newBuilder("cdc_stream") + .addSchema(SIMPLE_CDC_SCHEMA) + .producedValues( + Row.of(1, "INSERT", "Alice"), + Row.of(2, "INSERT", "Bob"), + Row.of(1, "UPDATE_BEFORE", "Alice"), + Row.of(1, "UPDATE_AFTER", "Alice2"), + Row.of(2, "DELETE", "Bob")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema("id INT", "name STRING") + .consumedValues( + Row.ofKind(RowKind.INSERT, 1, "Alice"), + Row.ofKind(RowKind.INSERT, 2, "Bob"), + Row.ofKind(RowKind.UPDATE_BEFORE, 1, "Alice"), + Row.ofKind(RowKind.UPDATE_AFTER, 1, "Alice2"), + Row.ofKind(RowKind.DELETE, 2, "Bob")) + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM FROM_CHANGELOG(" + + "input => TABLE cdc_stream)") + .build(); + + public static final TableTestProgram CUSTOM_OP_MAPPING = + TableTestProgram.of( + "from-changelog-custom-op-mapping", + "custom op_mapping with comma-separated keys") + .setupTableSource( + SourceTestStep.newBuilder("cdc_stream") + .addSchema(SIMPLE_CDC_SCHEMA) + .producedValues( + Row.of(1, "c", "Alice"), + Row.of(2, "r", "Bob"), + Row.of(1, "ub", "Alice"), + Row.of(1, "ua", "Alice2"), + Row.of(2, "d", "Bob")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema("id INT", "name STRING") + .consumedValues( + Row.ofKind(RowKind.INSERT, 1, "Alice"), + Row.ofKind(RowKind.INSERT, 2, "Bob"), + Row.ofKind(RowKind.UPDATE_BEFORE, 1, "Alice"), + Row.ofKind(RowKind.UPDATE_AFTER, 1, "Alice2"), + Row.ofKind(RowKind.DELETE, 2, "Bob")) + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM FROM_CHANGELOG(" + + "input => TABLE cdc_stream, " + + "op_mapping => MAP['c, r', 'INSERT', 'ub', 'UPDATE_BEFORE', 'ua', 'UPDATE_AFTER', 'd', 'DELETE'])") + .build(); + + public static final TableTestProgram UNMAPPED_CODES_DROPPED = + TableTestProgram.of( + "from-changelog-unmapped-codes-dropped", + "unmapped op codes are silently dropped") + .setupTableSource( + SourceTestStep.newBuilder("cdc_stream") + .addSchema(SIMPLE_CDC_SCHEMA) + .producedValues( + Row.of(1, "INSERT", "Alice"), + Row.of(2, "INSERT", "Bob"), + Row.of(1, "UNKNOWN", "Alice2"), + Row.of(2, "DELETE", "Bob")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema("id INT", "name STRING") + .consumedValues( + Row.ofKind(RowKind.INSERT, 1, "Alice"), + Row.ofKind(RowKind.INSERT, 2, "Bob"), + Row.ofKind(RowKind.DELETE, 2, "Bob")) + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM FROM_CHANGELOG(" + + "input => TABLE cdc_stream)") + .build(); + + /** Custom op column name via DESCRIPTOR. */ + public static final TableTestProgram CUSTOM_OP_NAME = + TableTestProgram.of( + "from-changelog-custom-op-name", "custom op column name via DESCRIPTOR") + .setupTableSource( + SourceTestStep.newBuilder("cdc_stream") + .addSchema("id INT", "operation STRING", "name STRING") + .producedValues( + Row.of(1, "INSERT", "Alice"), + Row.of(1, "UPDATE_BEFORE", "Alice"), + Row.of(1, "UPDATE_AFTER", "Alice2")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema("id INT", "name STRING") + .consumedValues( + Row.ofKind(RowKind.INSERT, 1, "Alice"), + Row.ofKind(RowKind.UPDATE_BEFORE, 1, "Alice"), + Row.ofKind(RowKind.UPDATE_AFTER, 1, "Alice2")) + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM FROM_CHANGELOG(" + + "input => TABLE cdc_stream, " + + "op => DESCRIPTOR(operation))") + .build(); + + // -------------------------------------------------------------------------------------------- + // Table API test + // -------------------------------------------------------------------------------------------- + + public static final TableTestProgram TABLE_API_DEFAULT = + TableTestProgram.of( + "from-changelog-table-api-default", + "Table.fromChangelog() convenience method") + .setupTableSource( + SourceTestStep.newBuilder("cdc_stream") + .addSchema(SIMPLE_CDC_SCHEMA) + .producedValues( + Row.of(1, "INSERT", "Alice"), + Row.of(2, "INSERT", "Bob")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema("id INT", "name STRING") + .consumedValues( + Row.ofKind(RowKind.INSERT, 1, "Alice"), + Row.ofKind(RowKind.INSERT, 2, "Bob")) + .build()) + .runTableApi(env -> env.from("cdc_stream").fromChangelog(), "sink") + .build(); + + // -------------------------------------------------------------------------------------------- + // Round-trip test: FROM_CHANGELOG(TO_CHANGELOG(table)) + // -------------------------------------------------------------------------------------------- + + /** Verifies that FROM_CHANGELOG(TO_CHANGELOG(table)) recovers the original dynamic table. */ + public static final TableTestProgram ROUND_TRIP = + TableTestProgram.of( + "from-changelog-round-trip", + "FROM_CHANGELOG(TO_CHANGELOG(table)) recovers original table") + .setupTableSource( + SourceTestStep.newBuilder("orders") + .addSchema("id INT", "name STRING") + .addMode(ChangelogMode.all()) + .producedValues( + Row.ofKind(RowKind.INSERT, 1, "Alice"), + Row.ofKind(RowKind.INSERT, 2, "Bob"), + Row.ofKind(RowKind.UPDATE_BEFORE, 1, "Alice"), + Row.ofKind(RowKind.UPDATE_AFTER, 1, "Alice2"), + Row.ofKind(RowKind.DELETE, 2, "Bob")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema("id INT", "name STRING") + .consumedValues( + Row.ofKind(RowKind.INSERT, 1, "Alice"), + Row.ofKind(RowKind.INSERT, 2, "Bob"), + Row.ofKind(RowKind.UPDATE_BEFORE, 1, "Alice"), + Row.ofKind(RowKind.UPDATE_AFTER, 1, "Alice2"), + Row.ofKind(RowKind.DELETE, 2, "Bob")) + .build()) + .setupSql( + "CREATE VIEW changelog_view AS " + + "SELECT * FROM TO_CHANGELOG(input => TABLE orders)") + .runSql( + "INSERT INTO sink SELECT * FROM FROM_CHANGELOG(" + + "input => TABLE changelog_view)") + .build(); +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java index 13ca08280b6..be6902a0b17 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java @@ -953,6 +953,21 @@ public class ProcessTableFunctionTestUtils { collectUpdate(ctx, r); } + @Override + public ChangelogMode getChangelogMode(ChangelogContext changelogContext) { + return ChangelogMode.upsert(false); + } + } + + /** Testing function that uses row semantics with retract mode (valid). */ + public static class UpdatingRetractRowSemanticFunction + extends ChangelogProcessTableFunctionBase { + public void eval( + Context ctx, + @ArgumentHint({ROW_SEMANTIC_TABLE, SUPPORT_UPDATES, REQUIRE_UPDATE_BEFORE}) Row r) { + collectUpdate(ctx, r); + } + @Override public ChangelogMode getChangelogMode(ChangelogContext changelogContext) { return ChangelogMode.all(); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java new file mode 100644 index 00000000000..33e163d9f41 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.stream.sql; + +import org.apache.flink.table.api.ExplainDetail; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.planner.utils.TableTestBase; +import org.apache.flink.table.planner.utils.TableTestUtil; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.List; + +/** + * Plan tests for the FROM_CHANGELOG built-in process table function. Uses {@link + * ExplainDetail#CHANGELOG_MODE} to verify changelog mode propagation through the plan. + */ +public class FromChangelogTest extends TableTestBase { + + private static final List<ExplainDetail> CHANGELOG_MODE = + Collections.singletonList(ExplainDetail.CHANGELOG_MODE); + + private TableTestUtil util; + + @BeforeEach + void setup() { + util = streamTestUtil(TableConfig.getDefault()); + } + + @Test + void testInsertOnlySource() { + util.tableEnv() + .executeSql( + "CREATE TABLE cdc_stream (" + + " id INT," + + " op STRING," + + " name STRING" + + ") WITH ('connector' = 'values')"); + util.verifyRelPlan( + "SELECT * FROM FROM_CHANGELOG(input => TABLE cdc_stream)", CHANGELOG_MODE); + } + + @Test + void testCustomOpMapping() { + util.tableEnv() + .executeSql( + "CREATE TABLE cdc_stream (" + + " id INT," + + " __op STRING," + + " name STRING" + + ") WITH ('connector' = 'values')"); + util.verifyRelPlan( + "SELECT * FROM FROM_CHANGELOG(" + + "input => TABLE cdc_stream, " + + "op => DESCRIPTOR(__op), " + + "op_mapping => MAP['c, r', 'INSERT', 'ub', 'UPDATE_BEFORE', 'ua', 'UPDATE_AFTER', 'd', 'DELETE'])", + CHANGELOG_MODE); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java index c00f76fbb66..17a303b93f8 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java @@ -20,6 +20,7 @@ package org.apache.flink.table.planner.plan.stream.sql; import org.apache.flink.table.annotation.ArgumentHint; import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.ExplainDetail; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.functions.ProcessTableFunction; @@ -42,6 +43,7 @@ import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctio import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.SetSemanticTablePassThroughFunction; import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.TypedRowSemanticTableFunction; import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.TypedSetSemanticTableFunction; +import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.UpdatingRetractRowSemanticFunction; import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.UpdatingUpsertFunction; import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.User; import org.apache.flink.table.planner.utils.TableTestBase; @@ -60,6 +62,7 @@ import java.util.EnumSet; import java.util.Optional; import java.util.stream.Stream; +import static java.util.Collections.singletonList; import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; import static org.apache.flink.table.annotation.ArgumentTrait.OPTIONAL_PARTITION_BY; import static org.apache.flink.table.annotation.ArgumentTrait.PASS_COLUMNS_THROUGH; @@ -282,6 +285,13 @@ public class ProcessTableFunctionTest extends TableTestBase { .addInsertSql("INSERT INTO t_sink SELECT * FROM f(r => TABLE t, i => 42)")); } + @Test + void testRetractModeWithRowSemantics() { + util.addTemporarySystemFunction("f", UpdatingRetractRowSemanticFunction.class); + util.verifyRelPlan( + "SELECT * FROM f(r => TABLE t)", singletonList(ExplainDetail.CHANGELOG_MODE)); + } + @ParameterizedTest @MethodSource("errorSpecs") void testErrorBehavior(ErrorSpec spec) { @@ -455,10 +465,10 @@ public class ProcessTableFunctionTest extends TableTestBase { "SELECT * FROM f()", "Table arguments must not be optional."), ErrorSpec.ofSelect( - "no changelog support for tables with row semantics", + "no upsert support for tables with row semantics", InvalidUpdatingSemanticsFunction.class, "SELECT * FROM f(r => TABLE t)", - "PTFs that take table arguments with row semantics don't support updating output. " + "PTFs that take table arguments with row semantics don't support upsert output. " + "Table argument 'r' of function 'f' must use set semantics."), ErrorSpec.ofSelect( "on_time is not supported on updating output", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.xml new file mode 100644 index 00000000000..2eed936fc6e --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.xml @@ -0,0 +1,57 @@ +<?xml version="1.0" ?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<Root> + <TestCase name="testCustomOpMapping"> + <Resource name="sql"> + <![CDATA[SELECT * FROM FROM_CHANGELOG(input => TABLE cdc_stream, op => DESCRIPTOR(__op), op_mapping => MAP['c, r', 'INSERT', 'ub', 'UPDATE_BEFORE', 'ua', 'UPDATE_AFTER', 'd', 'DELETE'])]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(id=[$0], name=[$1]) ++- LogicalTableFunctionScan(invocation=[FROM_CHANGELOG(TABLE(#0), DESCRIPTOR(_UTF-16LE'__op'), MAP(_UTF-16LE'c, r':VARCHAR(4) CHARACTER SET "UTF-16LE", _UTF-16LE'INSERT':VARCHAR(13) CHARACTER SET "UTF-16LE", _UTF-16LE'ub':VARCHAR(4) CHARACTER SET "UTF-16LE", _UTF-16LE'UPDATE_BEFORE':VARCHAR(13) CHARACTER SET "UTF-16LE", _UTF-16LE'ua':VARCHAR(4) CHARACTER SET "UTF-16LE", _UTF-16LE'UPDATE_AFTER':VARCHAR(13) CHARACTER SET "UTF-16LE", _UTF-16LE'd':VARCHAR(4) CHARACTER SET "UTF-16LE", _UTF-16 [...] + +- LogicalProject(id=[$0], __op=[$1], name=[$2]) + +- LogicalTableScan(table=[[default_catalog, default_database, cdc_stream]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +ProcessTableFunction(invocation=[FROM_CHANGELOG(TABLE(#0), DESCRIPTOR(_UTF-16LE'__op'), MAP(_UTF-16LE'c, r':VARCHAR(4) CHARACTER SET "UTF-16LE", _UTF-16LE'INSERT':VARCHAR(13) CHARACTER SET "UTF-16LE", _UTF-16LE'ub':VARCHAR(4) CHARACTER SET "UTF-16LE", _UTF-16LE'UPDATE_BEFORE':VARCHAR(13) CHARACTER SET "UTF-16LE", _UTF-16LE'ua':VARCHAR(4) CHARACTER SET "UTF-16LE", _UTF-16LE'UPDATE_AFTER':VARCHAR(13) CHARACTER SET "UTF-16LE", _UTF-16LE'd':VARCHAR(4) CHARACTER SET "UTF-16LE", _UTF-16LE'DELE [...] ++- TableSourceScan(table=[[default_catalog, default_database, cdc_stream]], fields=[id, __op, name], changelogMode=[I]) +]]> + </Resource> + </TestCase> + <TestCase name="testInsertOnlySource"> + <Resource name="sql"> + <![CDATA[SELECT * FROM FROM_CHANGELOG(input => TABLE cdc_stream)]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(id=[$0], name=[$1]) ++- LogicalTableFunctionScan(invocation=[FROM_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], rowType=[RecordType(INTEGER id, VARCHAR(2147483647) name)]) + +- LogicalProject(id=[$0], op=[$1], name=[$2]) + +- LogicalTableScan(table=[[default_catalog, default_database, cdc_stream]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +ProcessTableFunction(invocation=[FROM_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[null], select=[id,name], rowType=[RecordType(INTEGER id, VARCHAR(2147483647) name)], changelogMode=[I,UB,UA,D]) ++- TableSourceScan(table=[[default_catalog, default_database, cdc_stream]], fields=[id, op, name], changelogMode=[I]) +]]> + </Resource> + </TestCase> +</Root> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml index d89f766b84a..f0f340f4eb7 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml @@ -146,6 +146,26 @@ Calc(select=[out, rowtime]) +- Exchange(distribution=[hash[name]]) +- WatermarkAssigner(rowtime=[ts], watermark=[ts]) +- TableSourceScan(table=[[default_catalog, default_database, t_watermarked]], fields=[name, score, ts]) +]]> + </Resource> + </TestCase> + <TestCase name="testRetractModeWithRowSemantics"> + <Resource name="sql"> + <![CDATA[SELECT * FROM f(r => TABLE t)]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(name=[$0], count=[$1], mode=[$2]) ++- LogicalTableFunctionScan(invocation=[f(TABLE(#0), DEFAULT(), DEFAULT())], rowType=[RecordType(VARCHAR(2147483647) name, BIGINT count, VARCHAR(2147483647) mode)]) + +- LogicalProject(name=[$0], score=[$1]) + +- LogicalProject(name=[$0], score=[$1]) + +- LogicalValues(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 42 }]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +ProcessTableFunction(invocation=[f(TABLE(#0), DEFAULT(), DEFAULT())], uid=[null], select=[name,count,mode], rowType=[RecordType(VARCHAR(2147483647) name, BIGINT count, VARCHAR(2147483647) mode)], changelogMode=[I,UB,UA,D]) ++- Values(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 42 }]], changelogMode=[I]) ]]> </Resource> </TestCase> diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java new file mode 100644 index 00000000000..38249aa02bd --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.ptf; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.utils.ProjectedRowData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext; +import org.apache.flink.table.functions.TableSemantics; +import org.apache.flink.table.types.inference.CallContext; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.ColumnList; +import org.apache.flink.types.RowKind; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Runtime implementation of {@link BuiltInFunctionDefinitions#FROM_CHANGELOG}. + * + * <p>Converts each append-only input row (which contains an operation code column) back into a + * changelog stream with proper {@link RowKind} annotations. The output schema excludes the + * operation code column and partition key columns (which are prepended by the framework + * automatically). + * + * <p>This is the reverse operation of {@link ToChangelogFunction}. + */ +@Internal +public class FromChangelogFunction extends BuiltInProcessTableFunction<RowData> { + + private static final long serialVersionUID = 1L; + + private static final String DEFAULT_OP_COLUMN_NAME = "op"; + private static final Map<String, RowKind> DEFAULT_OP_MAPPING = + Map.of( + "INSERT", RowKind.INSERT, + "UPDATE_BEFORE", RowKind.UPDATE_BEFORE, + "UPDATE_AFTER", RowKind.UPDATE_AFTER, + "DELETE", RowKind.DELETE); + + private final Map<String, RowKind> rawOpMap; + private final int opColumnIndex; + private final int[] outputIndices; + + private transient HashMap<StringData, RowKind> opMap; + private transient ProjectedRowData projectedOutput; + + public FromChangelogFunction(final SpecializedContext context) { + super(BuiltInFunctionDefinitions.FROM_CHANGELOG, context); + final CallContext callContext = context.getCallContext(); + + final TableSemantics tableSemantics = + callContext + .getTableSemantics(0) + .orElseThrow(() -> new IllegalStateException("Table argument expected.")); + + final RowType inputType = (RowType) tableSemantics.dataType().getLogicalType(); + final String opColumnName = resolveOpColumnName(callContext); + this.opColumnIndex = inputType.getFieldNames().indexOf(opColumnName); + + // Exclude only the op column from output — all other columns pass through + this.outputIndices = + IntStream.range(0, inputType.getFieldCount()) + .filter(i -> i != opColumnIndex) + .toArray(); + + this.rawOpMap = buildOpMap(callContext); + } + + @Override + public void open(final FunctionContext context) throws Exception { + super.open(context); + opMap = new HashMap<>(); + rawOpMap.forEach((code, kind) -> opMap.put(StringData.fromString(code), kind)); + projectedOutput = ProjectedRowData.from(outputIndices); + } + + private static String resolveOpColumnName(final CallContext callContext) { + return callContext + .getArgumentValue(1, ColumnList.class) + .map(cl -> cl.getNames().get(0)) + .orElse(DEFAULT_OP_COLUMN_NAME); + } + + /** + * Builds a String-to-RowKind map. Keys in the provided mapping may be comma-separated (e.g., + * "INSERT, UPDATE_AFTER") to map multiple input codes to the same RowKind. + */ + private static Map<String, RowKind> buildOpMap(CallContext callContext) { + return callContext + .getArgumentValue(2, Map.class) + .map(FromChangelogFunction::parseOpMapping) + .orElse(DEFAULT_OP_MAPPING); + } + + private static Map<String, RowKind> parseOpMapping(Map<String, String> opMapping) { + return opMapping.entrySet().stream() + .flatMap( + e -> { + final RowKind kind = RowKind.valueOf(e.getValue().trim()); + return Arrays.stream(e.getKey().split(",")) + .map(code -> Map.entry(code.trim(), kind)); + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + public void eval( + final Context ctx, + final RowData input, + @Nullable final ColumnList op, + @Nullable final MapData opMapping) { + final StringData opCode = input.getString(opColumnIndex); + final RowKind rowKind = opMap.get(opCode); + if (rowKind == null) { + return; + } + + projectedOutput.replaceRow(input); + projectedOutput.setRowKind(rowKind); + collect(projectedOutput); + } +}
