gustavodemorais commented on code in PR #27901: URL: https://github.com/apache/flink/pull/27901#discussion_r3086908022
########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java: ########## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.DataTypes.Field; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.functions.TableSemantics; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.CallContext; +import org.apache.flink.table.types.inference.InputTypeStrategy; +import org.apache.flink.table.types.inference.TypeStrategy; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.types.ColumnList; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** Type strategies for the {@code FROM_CHANGELOG} process table function. */ +@Internal +public final class FromChangelogTypeStrategy { + + private static final String DEFAULT_OP_COLUMN_NAME = "op"; + + private static final Set<String> VALID_ROW_KIND_NAMES = + Set.of("INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE"); + + // -------------------------------------------------------------------------------------------- + // Input validation + // -------------------------------------------------------------------------------------------- + + public static final InputTypeStrategy INPUT_TYPE_STRATEGY = + new ValidationOnlyInputTypeStrategy() { + @Override + public Optional<List<DataType>> inferInputTypes( + final CallContext callContext, final boolean throwOnFailure) { + return validateInputs(callContext, throwOnFailure); + } + }; + + // -------------------------------------------------------------------------------------------- + // Output type inference + // -------------------------------------------------------------------------------------------- + + public static final TypeStrategy OUTPUT_TYPE_STRATEGY = + callContext -> { + final TableSemantics tableSemantics = + callContext + .getTableSemantics(0) + .orElseThrow( + () -> + new ValidationException( + "First argument must be a table for FROM_CHANGELOG.")); + + final String opColumnName = resolveOpColumnName(callContext); + + final List<Field> outputFields = buildOutputFields(tableSemantics, opColumnName); + + return Optional.of(DataTypes.ROW(outputFields).notNull()); + }; + + // -------------------------------------------------------------------------------------------- + // Helpers + // -------------------------------------------------------------------------------------------- + + @SuppressWarnings("rawtypes") + private static Optional<List<DataType>> validateInputs( Review Comment: This function is really long and not so easy to read. Could you try to make it a bit more readable? Ideas: - Either create multiple smaller helper functions that tell what you are validating for each section - Or increase the comments to have create division of what you're checking in each section I think I prefer the first one ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java: ########## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.DataTypes.Field; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.functions.TableSemantics; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.CallContext; +import org.apache.flink.table.types.inference.InputTypeStrategy; +import org.apache.flink.table.types.inference.TypeStrategy; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.types.ColumnList; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** Type strategies for the {@code FROM_CHANGELOG} process table function. */ +@Internal +public final class FromChangelogTypeStrategy { + + private static final String DEFAULT_OP_COLUMN_NAME = "op"; + + private static final Set<String> VALID_ROW_KIND_NAMES = + Set.of("INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE"); + + // -------------------------------------------------------------------------------------------- + // Input validation + // -------------------------------------------------------------------------------------------- + + public static final InputTypeStrategy INPUT_TYPE_STRATEGY = + new ValidationOnlyInputTypeStrategy() { + @Override + public Optional<List<DataType>> inferInputTypes( + final CallContext callContext, final boolean throwOnFailure) { + return validateInputs(callContext, throwOnFailure); + } + }; + + // -------------------------------------------------------------------------------------------- + // Output type inference + // -------------------------------------------------------------------------------------------- + + public static final TypeStrategy OUTPUT_TYPE_STRATEGY = + callContext -> { + final TableSemantics tableSemantics = + callContext + .getTableSemantics(0) + .orElseThrow( + () -> + new ValidationException( + "First argument must be a table for FROM_CHANGELOG.")); + + final String opColumnName = resolveOpColumnName(callContext); + + final List<Field> outputFields = buildOutputFields(tableSemantics, opColumnName); + + return Optional.of(DataTypes.ROW(outputFields).notNull()); + }; + + // -------------------------------------------------------------------------------------------- + // Helpers + // -------------------------------------------------------------------------------------------- + + @SuppressWarnings("rawtypes") + private static Optional<List<DataType>> validateInputs( + final CallContext callContext, final boolean throwOnFailure) { + final boolean isMissingTableArg = callContext.getTableSemantics(0).isEmpty(); + if (isMissingTableArg) { + return callContext.fail( + throwOnFailure, "First argument must be a table for FROM_CHANGELOG."); + } + + final Optional<ColumnList> opDescriptor = callContext.getArgumentValue(1, ColumnList.class); + final boolean hasInvalidOpDescriptor = + opDescriptor.isPresent() && opDescriptor.get().getNames().size() != 1; + if (hasInvalidOpDescriptor) { + return callContext.fail( + throwOnFailure, + "The descriptor for argument 'op' must contain exactly one column name."); + } + + // Validate that the op column exists in the input schema and is of STRING type + final TableSemantics tableSemantics = callContext.getTableSemantics(0).get(); + final String opColumnName = resolveOpColumnName(callContext); + final List<Field> inputFields = DataType.getFields(tableSemantics.dataType()); + final Optional<Field> opField = + inputFields.stream().filter(f -> f.getName().equals(opColumnName)).findFirst(); + if (opField.isEmpty()) { + return callContext.fail( + throwOnFailure, + String.format( + "The op column '%s' does not exist in the input schema.", + opColumnName)); + } + if (!opField.get().getDataType().getLogicalType().is(LogicalTypeFamily.CHARACTER_STRING)) { + return callContext.fail( + throwOnFailure, + String.format( + "The op column '%s' must be of STRING type, but was '%s'.", + opColumnName, opField.get().getDataType().getLogicalType())); + } + + final boolean hasMappingArgProvided = !callContext.isArgumentNull(2); + final boolean isMappingArgLiteral = callContext.isArgumentLiteral(2); + if (hasMappingArgProvided && !isMappingArgLiteral) { + return callContext.fail( + throwOnFailure, "The 'op_mapping' argument must be a constant MAP literal."); + } + + final Optional<Map> opMapping = callContext.getArgumentValue(2, Map.class); + if (opMapping.isPresent()) { + final Map<String, String> mapping = opMapping.get(); + final Optional<List<DataType>> validationError = + validateOpMappingValues(callContext, mapping, throwOnFailure); + if (validationError.isPresent()) { + return validationError; + } + + // Retract mode requires UPDATE_BEFORE in the mapping + final boolean hasUpdateBefore = + mapping.values().stream().anyMatch(v -> "UPDATE_BEFORE".equals(v.trim())); Review Comment: I think this is a bit too simplified. - We support append: a map where we only take the INSERT events - We support also only append and delete, for example - Maybe the check for now should be: if there's an UPDATE_AFTER, there should be also UPDATE_BEFORE ########## docs/content/docs/sql/reference/queries/changelog.md: ########## @@ -30,9 +30,103 @@ Flink SQL provides built-in process table functions (PTFs) for working with chan | Function | Description | |:---------|:------------| +| [FROM_CHANGELOG](#from_changelog) | Converts an append-only table with operation codes into a (potentially updating) dynamic table | | [TO_CHANGELOG](#to_changelog) | Converts a dynamic table into an append-only table with explicit operation codes | -<!-- Placeholder for future FROM_CHANGELOG function --> +## FROM_CHANGELOG + +The `FROM_CHANGELOG` PTF converts an append-only table with an explicit operation code column into a (potentially updating) dynamic table. Each input row is expected to have a string column that indicates the change operation. The operation column is interpreted by the engine and removed from the output. + +This is useful when consuming Change Data Capture (CDC) streams from systems like Debezium where events arrive as flat append-only records with an explicit operation field. It's also useful to be used in combination with the TO_CHANGELOG function, when converting the append-only table back into an updating table after doing some specific transformation to the events. + +Note: This version requires that your CDC data encodes updates using a full image (i.e. providing separate events for before and after the update). Please double-check whether your source provides both UPDATE_BEFORE and UPDATE_AFTER events. FROM_CHANGELOG is a very powerful function but might produce incorrect results in subsequent operations and tables, if not configured correctly. + +### Syntax + +```sql +SELECT * FROM FROM_CHANGELOG( + input => TABLE source_table, + [op => DESCRIPTOR(op_column_name),] + [op_mapping => MAP[ + 'c, r', 'INSERT', + 'ub', 'UPDATE_BEFORE', + 'ua', 'UPDATE_AFTER', + 'd', 'DELETE' + ]] +) +``` + +### Parameters + +| Parameter | Required | Description | +|:-------------|:---------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `input` | Yes | The input table. Must be append-only. | +| `op` | No | A `DESCRIPTOR` with a single column name for the operation code column. Defaults to `op`. The column must exist in the input table and be of type STRING. | +| `op_mapping` | No | A `MAP<STRING, STRING>` mapping user-defined codes to Flink change operation names. Keys are user-defined codes (e.g., `'c'`, `'u'`, `'d'`), values are Flink change operation names (`INSERT`, `UPDATE_BEFORE`, `UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated codes to map multiple codes to the same operation (e.g., `'c, r'`). When provided, only mapped codes are forwarded - unmapped codes are dropped. Each change operation may appear at most once across all entries. | + +#### Default op_mapping + +When `op_mapping` is omitted, the following standard names are used. They allow a reverse conversion from TO_CHANGELOG by default. + +| Input code | Change operation | +|:-------------------|:------------------| +| `'INSERT'` | INSERT | +| `'UPDATE_BEFORE'` | UPDATE_BEFORE | +| `'UPDATE_AFTER'` | UPDATE_AFTER | +| `'DELETE'` | DELETE | + +### Output Schema + +The output contains all input columns except the operation code (e.g., op) column, which is interpreted by Flink's SQL engine and removed. Each output row carries the appropriate change operation (INSERT, UPDATE_BEFORE, UPDATE_AFTER, or DELETE). + +``` +[all_input_columns_without_op] +``` + +### Examples + +#### Basic usage with standard op names + +```sql +-- Input (append-only): +-- +I[id:1, op:'INSERT', name:'Alice'] +-- +I[id:1, op:'UPDATE_BEFORE', name:'Alice'] +-- +I[id:1, op:'UPDATE_AFTER', name:'Alice2'] +-- +I[id:2, op:'DELETE', name:'Bob'] + +SELECT * FROM FROM_CHANGELOG( + input => TABLE cdc_stream +) + +-- Output (updating table): +-- +I[id:1, name:'Alice'] +-- -U[id:1, name:'Alice'] +-- +U[id:1, name:'Alice2'] +-- -D[id:2, name:'Bob'] + +-- Table state after all events: +-- | id | name | +-- |----|--------| +-- | 1 | Alice2 | +``` + +#### Custom operation column name + +```sql +-- Source schema: id INT, operation STRING, name STRING +SELECT * FROM FROM_CHANGELOG( + input => TABLE cdc_stream, + op => DESCRIPTOR(operation) +) +-- The operation column named 'operation' is used instead of 'op' +``` + +#### Table API + +```java +// Default: reads 'op' column with standard change operation names +Table result = cdcStream.fromChangelog(); Review Comment: Add the 'asArguments' examples on how to define the params with table api similar to to_changelog https://github.com/apache/flink/blob/fc27f81ca8f32d0cc043e1c442edd4ab717c09d2/docs/content/docs/sql/reference/queries/changelog.md#L146 ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; + +/** {@link TableTestProgram} definitions for testing the built-in FROM_CHANGELOG PTF. */ +public class FromChangelogTestPrograms { + + private static final SourceTestStep SIMPLE_CDC_SOURCE = + SourceTestStep.newBuilder("cdc_stream") + .addSchema("id INT", "op STRING", "name STRING") + .producedValues(Row.of(1, "INSERT", "Alice")) + .build(); + + // -------------------------------------------------------------------------------------------- + // SQL tests + // -------------------------------------------------------------------------------------------- + + public static final TableTestProgram DEFAULT_OP_MAPPING = + TableTestProgram.of( + "from-changelog-default-op-mapping", + "default mapping with standard op names") + .setupTableSource( + SourceTestStep.newBuilder("cdc_stream") + .addSchema("id INT", "op STRING", "name STRING") + .producedValues( + Row.of(1, "INSERT", "Alice"), + Row.of(2, "INSERT", "Bob"), + Row.of(1, "UPDATE_BEFORE", "Alice"), + Row.of(1, "UPDATE_AFTER", "Alice2"), + Row.of(2, "DELETE", "Bob")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema( + "id INT NOT NULL", + "name STRING", + "PRIMARY KEY (id) NOT ENFORCED") + .consumedValues( + Row.ofKind(RowKind.INSERT, 1, "Alice"), + Row.ofKind(RowKind.INSERT, 2, "Bob"), + Row.ofKind(RowKind.UPDATE_BEFORE, 1, "Alice"), + Row.ofKind(RowKind.UPDATE_AFTER, 1, "Alice2"), + Row.ofKind(RowKind.DELETE, 2, "Bob")) + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM FROM_CHANGELOG(" + + "input => TABLE cdc_stream)") + .build(); + + public static final TableTestProgram DEBEZIUM_MAPPING = + TableTestProgram.of( + "from-changelog-custom-mapping", + "custom op_mapping with comma-separated keys") + .setupTableSource( + SourceTestStep.newBuilder("cdc_stream") + .addSchema("id INT", "__op STRING", "name STRING") + .producedValues( + Row.of(1, "c", "Alice"), + Row.of(2, "r", "Bob"), + Row.of(1, "ub", "Alice"), + Row.of(1, "ua", "Alice2"), + Row.of(2, "d", "Bob")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema( + "id INT NOT NULL", + "name STRING", + "PRIMARY KEY (id) NOT ENFORCED") + .consumedValues( + Row.ofKind(RowKind.INSERT, 1, "Alice"), + Row.ofKind(RowKind.INSERT, 2, "Bob"), + Row.ofKind(RowKind.UPDATE_BEFORE, 1, "Alice"), + Row.ofKind(RowKind.UPDATE_AFTER, 1, "Alice2"), + Row.ofKind(RowKind.DELETE, 2, "Bob")) + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM FROM_CHANGELOG(" + + "input => TABLE cdc_stream, " + + "op => DESCRIPTOR(__op), " + + "op_mapping => MAP['c, r', 'INSERT', 'ub', 'UPDATE_BEFORE', 'ua', 'UPDATE_AFTER', 'd', 'DELETE'])") + .build(); + + public static final TableTestProgram UNMAPPED_CODES_DROPPED = + TableTestProgram.of( + "from-changelog-unmapped-codes-dropped", + "unmapped op codes are silently dropped") + .setupTableSource( + SourceTestStep.newBuilder("cdc_stream") + .addSchema("id INT", "op STRING", "name STRING") + .producedValues( + Row.of(1, "INSERT", "Alice"), + Row.of(2, "INSERT", "Bob"), + Row.of(1, "UNKNOWN", "Alice2"), + Row.of(2, "DELETE", "Bob")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema( + "id INT NOT NULL", + "name STRING", + "PRIMARY KEY (id) NOT ENFORCED") + .consumedValues( + Row.ofKind(RowKind.INSERT, 1, "Alice"), + Row.ofKind(RowKind.INSERT, 2, "Bob"), + Row.ofKind(RowKind.DELETE, 2, "Bob")) + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM FROM_CHANGELOG(" + + "input => TABLE cdc_stream)") + .build(); + + /** Custom op column name via DESCRIPTOR. */ + public static final TableTestProgram CUSTOM_OP_NAME = + TableTestProgram.of( + "from-changelog-custom-op-name", "custom op column name via DESCRIPTOR") + .setupTableSource( + SourceTestStep.newBuilder("cdc_stream") + .addSchema("id INT", "operation STRING", "name STRING") + .producedValues( + Row.of(1, "INSERT", "Alice"), + Row.of(1, "UPDATE_BEFORE", "Alice"), + Row.of(1, "UPDATE_AFTER", "Alice2")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema( + "id INT NOT NULL", + "name STRING", + "PRIMARY KEY (id) NOT ENFORCED") + .consumedValues( + Row.ofKind(RowKind.INSERT, 1, "Alice"), + Row.ofKind(RowKind.UPDATE_BEFORE, 1, "Alice"), + Row.ofKind(RowKind.UPDATE_AFTER, 1, "Alice2")) + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM FROM_CHANGELOG(" + + "input => TABLE cdc_stream, " + + "op => DESCRIPTOR(operation))") + .build(); + + // -------------------------------------------------------------------------------------------- + // Table API test + // -------------------------------------------------------------------------------------------- + + public static final TableTestProgram TABLE_API_DEFAULT = + TableTestProgram.of( + "from-changelog-table-api-default", + "Table.fromChangelog() convenience method") + .setupTableSource( + SourceTestStep.newBuilder("cdc_stream") + .addSchema("id INT", "op STRING", "name STRING") + .producedValues( + Row.of(1, "INSERT", "Alice"), + Row.of(2, "INSERT", "Bob")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema( + "id INT NOT NULL", + "name STRING", + "PRIMARY KEY (id) NOT ENFORCED") + .consumedValues( + Row.ofKind(RowKind.INSERT, 1, "Alice"), + Row.ofKind(RowKind.INSERT, 2, "Bob")) + .build()) + .runTableApi(env -> env.from("cdc_stream").fromChangelog(), "sink") + .build(); + + // -------------------------------------------------------------------------------------------- + // Round-trip test: FROM_CHANGELOG(TO_CHANGELOG(table)) + // -------------------------------------------------------------------------------------------- + + /** + * Verifies that FROM_CHANGELOG(TO_CHANGELOG(table)) recovers the original dynamic table. + * Requires TO_CHANGELOG with row semantics (PR #27911). + */ + public static final TableTestProgram ROUND_TRIP = Review Comment: Nice! ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; + +/** {@link TableTestProgram} definitions for testing the built-in FROM_CHANGELOG PTF. */ +public class FromChangelogTestPrograms { + + private static final SourceTestStep SIMPLE_CDC_SOURCE = + SourceTestStep.newBuilder("cdc_stream") + .addSchema("id INT", "op STRING", "name STRING") + .producedValues(Row.of(1, "INSERT", "Alice")) + .build(); + + // -------------------------------------------------------------------------------------------- + // SQL tests + // -------------------------------------------------------------------------------------------- + + public static final TableTestProgram DEFAULT_OP_MAPPING = + TableTestProgram.of( + "from-changelog-default-op-mapping", + "default mapping with standard op names") + .setupTableSource( + SourceTestStep.newBuilder("cdc_stream") + .addSchema("id INT", "op STRING", "name STRING") + .producedValues( + Row.of(1, "INSERT", "Alice"), + Row.of(2, "INSERT", "Bob"), + Row.of(1, "UPDATE_BEFORE", "Alice"), + Row.of(1, "UPDATE_AFTER", "Alice2"), + Row.of(2, "DELETE", "Bob")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema( + "id INT NOT NULL", + "name STRING", + "PRIMARY KEY (id) NOT ENFORCED") + .consumedValues( + Row.ofKind(RowKind.INSERT, 1, "Alice"), + Row.ofKind(RowKind.INSERT, 2, "Bob"), + Row.ofKind(RowKind.UPDATE_BEFORE, 1, "Alice"), + Row.ofKind(RowKind.UPDATE_AFTER, 1, "Alice2"), + Row.ofKind(RowKind.DELETE, 2, "Bob")) + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM FROM_CHANGELOG(" + + "input => TABLE cdc_stream)") + .build(); + + public static final TableTestProgram DEBEZIUM_MAPPING = + TableTestProgram.of( + "from-changelog-custom-mapping", + "custom op_mapping with comma-separated keys") + .setupTableSource( + SourceTestStep.newBuilder("cdc_stream") + .addSchema("id INT", "__op STRING", "name STRING") + .producedValues( + Row.of(1, "c", "Alice"), + Row.of(2, "r", "Bob"), + Row.of(1, "ub", "Alice"), + Row.of(1, "ua", "Alice2"), + Row.of(2, "d", "Bob")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema( + "id INT NOT NULL", + "name STRING", + "PRIMARY KEY (id) NOT ENFORCED") + .consumedValues( + Row.ofKind(RowKind.INSERT, 1, "Alice"), + Row.ofKind(RowKind.INSERT, 2, "Bob"), + Row.ofKind(RowKind.UPDATE_BEFORE, 1, "Alice"), + Row.ofKind(RowKind.UPDATE_AFTER, 1, "Alice2"), + Row.ofKind(RowKind.DELETE, 2, "Bob")) + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM FROM_CHANGELOG(" + + "input => TABLE cdc_stream, " + + "op => DESCRIPTOR(__op), " + + "op_mapping => MAP['c, r', 'INSERT', 'ub', 'UPDATE_BEFORE', 'ua', 'UPDATE_AFTER', 'd', 'DELETE'])") + .build(); + + public static final TableTestProgram UNMAPPED_CODES_DROPPED = + TableTestProgram.of( + "from-changelog-unmapped-codes-dropped", + "unmapped op codes are silently dropped") + .setupTableSource( + SourceTestStep.newBuilder("cdc_stream") + .addSchema("id INT", "op STRING", "name STRING") + .producedValues( + Row.of(1, "INSERT", "Alice"), + Row.of(2, "INSERT", "Bob"), + Row.of(1, "UNKNOWN", "Alice2"), + Row.of(2, "DELETE", "Bob")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema( + "id INT NOT NULL", + "name STRING", + "PRIMARY KEY (id) NOT ENFORCED") + .consumedValues( + Row.ofKind(RowKind.INSERT, 1, "Alice"), + Row.ofKind(RowKind.INSERT, 2, "Bob"), + Row.ofKind(RowKind.DELETE, 2, "Bob")) + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM FROM_CHANGELOG(" + + "input => TABLE cdc_stream)") + .build(); + + /** Custom op column name via DESCRIPTOR. */ + public static final TableTestProgram CUSTOM_OP_NAME = + TableTestProgram.of( + "from-changelog-custom-op-name", "custom op column name via DESCRIPTOR") + .setupTableSource( + SourceTestStep.newBuilder("cdc_stream") + .addSchema("id INT", "operation STRING", "name STRING") + .producedValues( + Row.of(1, "INSERT", "Alice"), + Row.of(1, "UPDATE_BEFORE", "Alice"), + Row.of(1, "UPDATE_AFTER", "Alice2")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema( + "id INT NOT NULL", + "name STRING", + "PRIMARY KEY (id) NOT ENFORCED") + .consumedValues( + Row.ofKind(RowKind.INSERT, 1, "Alice"), + Row.ofKind(RowKind.UPDATE_BEFORE, 1, "Alice"), + Row.ofKind(RowKind.UPDATE_AFTER, 1, "Alice2")) + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM FROM_CHANGELOG(" + + "input => TABLE cdc_stream, " + + "op => DESCRIPTOR(operation))") + .build(); + + // -------------------------------------------------------------------------------------------- + // Table API test + // -------------------------------------------------------------------------------------------- + + public static final TableTestProgram TABLE_API_DEFAULT = + TableTestProgram.of( + "from-changelog-table-api-default", + "Table.fromChangelog() convenience method") + .setupTableSource( + SourceTestStep.newBuilder("cdc_stream") + .addSchema("id INT", "op STRING", "name STRING") + .producedValues( + Row.of(1, "INSERT", "Alice"), + Row.of(2, "INSERT", "Bob")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema( + "id INT NOT NULL", + "name STRING", + "PRIMARY KEY (id) NOT ENFORCED") + .consumedValues( + Row.ofKind(RowKind.INSERT, 1, "Alice"), + Row.ofKind(RowKind.INSERT, 2, "Bob")) + .build()) + .runTableApi(env -> env.from("cdc_stream").fromChangelog(), "sink") + .build(); + + // -------------------------------------------------------------------------------------------- + // Round-trip test: FROM_CHANGELOG(TO_CHANGELOG(table)) + // -------------------------------------------------------------------------------------------- + + /** + * Verifies that FROM_CHANGELOG(TO_CHANGELOG(table)) recovers the original dynamic table. + * Requires TO_CHANGELOG with row semantics (PR #27911). Review Comment: It's merged ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java: ########## @@ -399,6 +416,16 @@ public Builder sqlName(String name) { return this; } + /** + * Sets a resolver that determines the output {@link ChangelogMode} for this built-in PTF. + * Only needed for PTFs that emit updates (e.g., FROM_CHANGELOG). Review Comment: ```suggestion * Sets a resolver that determines the output {@link ChangelogMode} for this built-in function. * Only needed for functions that emit updates (e.g., FROM_CHANGELOG). ``` We're adding this to the BuiltInFunctionDefinition this is theoretically not PTF specific ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; + +/** {@link TableTestProgram} definitions for testing the built-in FROM_CHANGELOG PTF. */ +public class FromChangelogTestPrograms { + + private static final SourceTestStep SIMPLE_CDC_SOURCE = + SourceTestStep.newBuilder("cdc_stream") + .addSchema("id INT", "op STRING", "name STRING") + .producedValues(Row.of(1, "INSERT", "Alice")) + .build(); + + // -------------------------------------------------------------------------------------------- + // SQL tests + // -------------------------------------------------------------------------------------------- + + public static final TableTestProgram DEFAULT_OP_MAPPING = + TableTestProgram.of( + "from-changelog-default-op-mapping", + "default mapping with standard op names") + .setupTableSource( + SourceTestStep.newBuilder("cdc_stream") + .addSchema("id INT", "op STRING", "name STRING") + .producedValues( + Row.of(1, "INSERT", "Alice"), + Row.of(2, "INSERT", "Bob"), + Row.of(1, "UPDATE_BEFORE", "Alice"), + Row.of(1, "UPDATE_AFTER", "Alice2"), + Row.of(2, "DELETE", "Bob")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema( + "id INT NOT NULL", + "name STRING", + "PRIMARY KEY (id) NOT ENFORCED") + .consumedValues( + Row.ofKind(RowKind.INSERT, 1, "Alice"), + Row.ofKind(RowKind.INSERT, 2, "Bob"), + Row.ofKind(RowKind.UPDATE_BEFORE, 1, "Alice"), + Row.ofKind(RowKind.UPDATE_AFTER, 1, "Alice2"), + Row.ofKind(RowKind.DELETE, 2, "Bob")) + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM FROM_CHANGELOG(" + + "input => TABLE cdc_stream)") + .build(); + + public static final TableTestProgram DEBEZIUM_MAPPING = + TableTestProgram.of( + "from-changelog-custom-mapping", + "custom op_mapping with comma-separated keys") + .setupTableSource( + SourceTestStep.newBuilder("cdc_stream") + .addSchema("id INT", "__op STRING", "name STRING") Review Comment: You can drop the "__op" and just use "op" everywhere in these tests. Debezium also uses "op" ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java: ########## @@ -918,6 +918,19 @@ public void eval(Context ctx, @ArgumentHint({ROW_SEMANTIC_TABLE, SUPPORT_UPDATES collectUpdate(ctx, r); } + @Override + public ChangelogMode getChangelogMode(ChangelogContext changelogContext) { + return ChangelogMode.upsert(false); + } + } + + /** Testing function that uses row semantics with retract mode (valid). */ + public static class UpdatingRetractRowSemanticFunction + extends ChangelogProcessTableFunctionBase { + public void eval(Context ctx, @ArgumentHint({ROW_SEMANTIC_TABLE, SUPPORT_UPDATES}) Row r) { Review Comment: Did you want to do this? ```suggestion public void eval(Context ctx, @ArgumentHint({ROW_SEMANTIC_TABLE, SUPPORT_UPDATES, REQUIRE_UPDATE_BEFORE}) Row r) { ``` SUPPORT_UPDATES can be both upsert and retract -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
