This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new bdb4f71fcba [FLINK-39259][table] Support basic TO_CHANGELOG built-in
process table function
bdb4f71fcba is described below
commit bdb4f71fcba92a59e5f74f9e2362063328343e88
Author: Gustavo de Morais <[email protected]>
AuthorDate: Fri Mar 20 09:18:24 2026 +0100
[FLINK-39259][table] Support basic TO_CHANGELOG built-in process table
function
This closes #27777.
---
.../docs/sql/reference/queries/changelog.md | 138 ++++++++
.../apache/flink/table/api/PartitionedTable.java | 28 ++
.../apache/flink/table/api/internal/TableImpl.java | 5 +
.../functions/BuiltInFunctionDefinitions.java | 27 ++
.../strategies/SpecificInputTypeStrategies.java | 4 +
.../strategies/SpecificTypeStrategies.java | 4 +
.../strategies/ToChangelogTypeStrategy.java | 171 ++++++++++
.../inference/CallBindingCallContext.java | 58 +++-
.../inference/OperatorBindingCallContext.java | 55 +++-
.../exec/stream/ToChangelogSemanticTests.java | 53 +++
.../nodes/exec/stream/ToChangelogTestPrograms.java | 365 +++++++++++++++++++++
.../planner/plan/stream/sql/ToChangelogTest.java | 93 ++++++
.../planner/plan/stream/sql/ToChangelogTest.xml | 57 ++++
.../flink/table/planner/utils/TableTestBase.scala | 5 +
.../functions/ptf/BuiltInProcessTableFunction.java | 133 ++++++++
.../runtime/functions/ptf/ToChangelogFunction.java | 135 ++++++++
16 files changed, 1314 insertions(+), 17 deletions(-)
diff --git a/docs/content/docs/sql/reference/queries/changelog.md
b/docs/content/docs/sql/reference/queries/changelog.md
new file mode 100644
index 00000000000..d7e6e29668a
--- /dev/null
+++ b/docs/content/docs/sql/reference/queries/changelog.md
@@ -0,0 +1,138 @@
+---
+title: "Changelog Conversion"
+weight: 8
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Changelog Conversion
+
+{{< label Streaming >}}
+
+Flink SQL provides built-in process table functions (PTFs) for working with
changelog streams.
+
+| Function | Description |
+|:---------|:------------|
+| [TO_CHANGELOG](#to_changelog) | Converts a dynamic table into an append-only
table with explicit operation codes |
+
+<!-- Placeholder for future FROM_CHANGELOG function -->
+
+## TO_CHANGELOG
+
+The `TO_CHANGELOG` PTF converts a dynamic table (i.e. an updating table) into
an append-only table with an explicit operation code column. Each input row -
regardless of its original `RowKind` (INSERT, UPDATE_BEFORE, UPDATE_AFTER,
DELETE) - is emitted as an INSERT-only row with a string column indicating the
original operation.
+
+This is useful when you need to materialize changelog events into a downstream
system that only supports appends (e.g., a message queue, log store, or
append-only file sink). It is also useful to filter out certain types of
updates, for example DELETEs.
+
+### Syntax
+
+```sql
+SELECT * FROM TO_CHANGELOG(
+ input => TABLE source_table PARTITION BY key_col,
+ [op => DESCRIPTOR(op_column_name),]
+ [op_mapping => MAP['INSERT', 'I', 'DELETE', 'D', ...]]
+)
+```
+
+### Parameters
+
+| Parameter | Required | Description |
+|:-------------|:---------|:------------|
+| `input` | Yes | The input table. Must include `PARTITION BY` for
parallel execution. Accepts insert-only, retract, and upsert tables. |
+| `op` | No | A `DESCRIPTOR` with a single column name for the
operation code column. Defaults to `op`. |
+| `op_mapping` | No | A `MAP<STRING, STRING>` mapping `RowKind` names to
custom output codes. When provided, only mapped RowKinds are forwarded -
unmapped events are dropped. |
+
+#### Default op_mapping
+
+When `op_mapping` is omitted, all four RowKinds are mapped to their standard
names:
+
+| RowKind | Output value |
+|:----------------|:------------------|
+| INSERT | `'INSERT'` |
+| UPDATE_BEFORE | `'UPDATE_BEFORE'` |
+| UPDATE_AFTER | `'UPDATE_AFTER'` |
+| DELETE | `'DELETE'` |
+
+### Output Schema
+
+The output columns are ordered as:
+
+```
+[partition_key_columns, op_column, remaining_columns]
+```
+
+All output rows have `INSERT` - the table is always append-only.
+
+### Examples
+
+#### Basic usage
+
+```sql
+-- Input: retract table from an aggregation
+-- +I[id:1, name:'Alice', cnt:1]
+-- +U[id:1, name:'Alice', cnt:2]
+-- -D[id:2, name:'Bob', cnt:1]
+
+SELECT * FROM TO_CHANGELOG(
+ input => TABLE my_aggregation PARTITION BY id
+)
+
+-- Output (append-only):
+-- +I[id:1, op:'INSERT', name:'Alice', cnt:1]
+-- +I[id:1, op:'UPDATE_AFTER', name:'Alice', cnt:2]
+-- +I[id:2, op:'DELETE', name:'Bob', cnt:1]
+```
+
+#### Custom operation column name
+
+```sql
+SELECT * FROM TO_CHANGELOG(
+ input => TABLE my_aggregation PARTITION BY id,
+ op => DESCRIPTOR(operation)
+)
+-- The op column is now named 'operation' instead of 'op'
+```
+
+#### Custom operation codes with filtering
+
+```sql
+SELECT * FROM TO_CHANGELOG(
+ input => TABLE my_aggregation PARTITION BY id,
+ op => DESCRIPTOR(op_code),
+ op_mapping => MAP['INSERT', 'I', 'UPDATE_AFTER', 'U']
+)
+-- Only INSERT and UPDATE_AFTER events are forwarded
+-- DELETE events are dropped (not in the mapping)
+-- op_code values are 'I' and 'U' instead of full names
+```
+
+#### Table API
+
+```java
+// Default: adds 'op' column and supports all changelog modes
+Table result = myTable.partitionBy($("id")).toChangelog();
+
+// With custom parameters
+Table result = myTable.partitionBy($("id")).toChangelog(
+ descriptor("op_code").asArgument("op"),
+ map("INSERT", "I", "UPDATE_AFTER", "U").asArgument("op_mapping")
+);
+```
+
+{{< top >}}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
index 32180e51fb9..8d5f1c91b28 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.api;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.annotation.ArgumentTrait;
+import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.functions.ProcessTableFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
@@ -138,4 +139,31 @@ public interface PartitionedTable {
* @see ProcessTableFunction
*/
Table process(Class<? extends UserDefinedFunction> function, Object...
arguments);
+
+ /**
+ * Converts this dynamic table into an append-only table with an explicit
operation code column
+ * using the built-in {@code TO_CHANGELOG} process table function.
+ *
+ * <p>Each input row - regardless of its original RowKind - is emitted as
an INSERT-only row
+ * with a string {@code "op"} column indicating the original operation
(INSERT, UPDATE_AFTER,
+ * DELETE, etc.).
+ *
+ * <p>Optional arguments can be passed using named expressions:
+ *
+ * <pre>{@code
+ * // Default: adds 'op' column and supports all changelog modes
+ * table.partitionBy($("id")).toChangelog();
+ *
+ * // Custom op column name and mapping
+ * table.partitionBy($("id")).toChangelog(
+ * descriptor("op_code").asArgument("op"),
+ * map("INSERT", "I", "UPDATE_AFTER", "U").asArgument("op_mapping")
+ * );
+ * }</pre>
+ *
+ * @param arguments optional named arguments for {@code op} and {@code
op_mapping}
+ * @return an append-only {@link Table} with an {@code op} column
prepended to the non-partition
+ * columns
+ */
+ Table toChangelog(Expression... arguments);
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
index 581dadba4da..335f28f2f8e 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
@@ -901,6 +901,11 @@ public class TableImpl implements Table {
createPartitionQueryOperation(),
table.tableEnvironment, arguments));
}
+ @Override
+ public Table toChangelog(Expression... arguments) {
+ return process(BuiltInFunctionDefinitions.TO_CHANGELOG.getName(),
(Object[]) arguments);
+ }
+
private QueryOperation createPartitionQueryOperation() {
return table.operationTreeBuilder.partition(partitionKeys,
table.operationTree);
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 6685be8fdbd..b03988321e1 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -109,12 +109,14 @@ import static
org.apache.flink.table.types.inference.strategies.SpecificInputTyp
import static
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.INDEX;
import static
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.JSON_ARGUMENT;
import static
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.ML_PREDICT_INPUT_TYPE_STRATEGY;
+import static
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.TO_CHANGELOG_INPUT_TYPE_STRATEGY;
import static
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.TWO_EQUALS_COMPARABLE;
import static
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.TWO_FULLY_COMPARABLE;
import static
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.percentage;
import static
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.percentageArray;
import static
org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.ARRAY_APPEND_PREPEND;
import static
org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.ML_PREDICT_OUTPUT_TYPE_STRATEGY;
+import static
org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.TO_CHANGELOG_OUTPUT_TYPE_STRATEGY;
/** Dictionary of function definitions for all built-in functions. */
@PublicEvolving
@@ -755,6 +757,31 @@ public final class BuiltInFunctionDefinitions {
.runtimeProvided()
.build();
+ public static final BuiltInFunctionDefinition TO_CHANGELOG =
+ BuiltInFunctionDefinition.newBuilder()
+ .name("TO_CHANGELOG")
+ .kind(PROCESS_TABLE)
+ .staticArguments(
+ StaticArgument.table(
+ "input",
+ Row.class,
+ false,
+ EnumSet.of(
+ StaticArgumentTrait.TABLE,
+
StaticArgumentTrait.SET_SEMANTIC_TABLE,
+
StaticArgumentTrait.SUPPORT_UPDATES,
+
StaticArgumentTrait.REQUIRE_UPDATE_BEFORE)),
+ StaticArgument.scalar("op",
DataTypes.DESCRIPTOR(), true),
+ StaticArgument.scalar(
+ "op_mapping",
+ DataTypes.MAP(DataTypes.STRING(),
DataTypes.STRING()),
+ true))
+ .inputTypeStrategy(TO_CHANGELOG_INPUT_TYPE_STRATEGY)
+ .outputTypeStrategy(TO_CHANGELOG_OUTPUT_TYPE_STRATEGY)
+ .runtimeClass(
+
"org.apache.flink.table.runtime.functions.ptf.ToChangelogFunction")
+ .build();
+
public static final BuiltInFunctionDefinition GREATEST =
BuiltInFunctionDefinition.newBuilder()
.name("GREATEST")
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
index 9206985aa13..4455b083ce2 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
@@ -122,6 +122,10 @@ public final class SpecificInputTypeStrategies {
public static final InputTypeStrategy ML_PREDICT_INPUT_TYPE_STRATEGY =
MLPredictTypeStrategy.ML_PREDICT_INPUT_TYPE_STRATEGY;
+ /** Input strategy for {@link BuiltInFunctionDefinitions#TO_CHANGELOG}. */
+ public static final InputTypeStrategy TO_CHANGELOG_INPUT_TYPE_STRATEGY =
+ ToChangelogTypeStrategy.INPUT_TYPE_STRATEGY;
+
/** See {@link ExtractInputTypeStrategy}. */
public static final InputTypeStrategy EXTRACT = new
ExtractInputTypeStrategy();
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java
index 79e49de78cc..c1b854cad7c 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java
@@ -198,6 +198,10 @@ public final class SpecificTypeStrategies {
public static final TypeStrategy ML_PREDICT_OUTPUT_TYPE_STRATEGY =
MLPredictTypeStrategy.ML_PREDICT_OUTPUT_TYPE_STRATEGY;
+ /** Type strategy specific for {@link
BuiltInFunctionDefinitions#TO_CHANGELOG}. */
+ public static final TypeStrategy TO_CHANGELOG_OUTPUT_TYPE_STRATEGY =
+ ToChangelogTypeStrategy.OUTPUT_TYPE_STRATEGY;
+
private SpecificTypeStrategies() {
// no instantiation
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
new file mode 100644
index 00000000000..cb04e720209
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.DataTypes.Field;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.TableSemantics;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.inference.Signature.Argument;
+import org.apache.flink.table.types.inference.TypeStrategy;
+import org.apache.flink.types.ColumnList;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/** Type strategies for the {@code TO_CHANGELOG} process table function. */
+@Internal
+public final class ToChangelogTypeStrategy {
+
+ private static final String DEFAULT_OP_COLUMN_NAME = "op";
+
+ private static final Set<String> VALID_ROW_KIND_NAMES =
+ Set.of("INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE");
+
+ //
--------------------------------------------------------------------------------------------
+ // Input validation
+ //
--------------------------------------------------------------------------------------------
+
+ public static final InputTypeStrategy INPUT_TYPE_STRATEGY =
+ new InputTypeStrategy() {
+ @Override
+ public ArgumentCount getArgumentCount() {
+ return ConstantArgumentCount.between(1, 3);
+ }
+
+ @Override
+ public Optional<List<DataType>> inferInputTypes(
+ final CallContext callContext, final boolean
throwOnFailure) {
+ return validateInputs(callContext, throwOnFailure);
+ }
+
+ @Override
+ public List<Signature> getExpectedSignatures(final
FunctionDefinition definition) {
+ return List.of(
+ Signature.of(Argument.of("input", "TABLE")),
+ Signature.of(
+ Argument.of("input", "TABLE"),
Argument.of("op", "DESCRIPTOR")),
+ Signature.of(
+ Argument.of("input", "TABLE"),
+ Argument.of("op", "DESCRIPTOR"),
+ Argument.of("op_mapping", "MAP<STRING,
STRING>")));
+ }
+ };
+
+ //
--------------------------------------------------------------------------------------------
+ // Output type inference
+ //
--------------------------------------------------------------------------------------------
+
+ public static final TypeStrategy OUTPUT_TYPE_STRATEGY =
+ callContext -> {
+ final TableSemantics semantics =
+ callContext
+ .getTableSemantics(0)
+ .orElseThrow(
+ () ->
+ new ValidationException(
+ "First argument must
be a table for TO_CHANGELOG."));
+
+ final String opColumnName = resolveOpColumnName(callContext);
+ final List<Field> inputFields =
DataType.getFields(semantics.dataType());
+ final Set<Integer> partitionKeys =
intArrayToSet(semantics.partitionByColumns());
+
+ final List<Field> outputFields = new ArrayList<>();
+ outputFields.add(DataTypes.FIELD(opColumnName,
DataTypes.STRING()));
+ for (int i = 0; i < inputFields.size(); i++) {
+ if (!partitionKeys.contains(i)) {
+ outputFields.add(inputFields.get(i));
+ }
+ }
+
+ return Optional.of(DataTypes.ROW(outputFields).notNull());
+ };
+
+ //
--------------------------------------------------------------------------------------------
+ // Helpers
+ //
--------------------------------------------------------------------------------------------
+
+ private static Optional<List<DataType>> validateInputs(
+ final CallContext callContext, final boolean throwOnFailure) {
+ final boolean isMissingTableArg =
callContext.getTableSemantics(0).isEmpty();
+ if (isMissingTableArg) {
+ return callContext.fail(
+ throwOnFailure, "First argument must be a table for
TO_CHANGELOG.");
+ }
+
+ final Optional<ColumnList> opDescriptor =
callContext.getArgumentValue(1, ColumnList.class);
+ final boolean hasInvalidOpDescriptor =
+ opDescriptor.isPresent() &&
opDescriptor.get().getNames().size() != 1;
+ if (hasInvalidOpDescriptor) {
+ return callContext.fail(
+ throwOnFailure,
+ "The descriptor for argument 'op' must contain exactly one
column name.");
+ }
+
+ final boolean hasMappingArgProvided = !callContext.isArgumentNull(2);
+ final boolean isMappingArgLiteral = callContext.isArgumentLiteral(2);
+ if (hasMappingArgProvided && !isMappingArgLiteral) {
+ return callContext.fail(
+ throwOnFailure, "The 'op_mapping' argument must be a
constant MAP literal.");
+ }
+
+ final Optional<Map> opMapping = callContext.getArgumentValue(2,
Map.class);
+ if (opMapping.isPresent()) {
+ final boolean hasInvalidMappingKey =
+ opMapping.get().keySet().stream()
+ .anyMatch(
+ key ->
+ !(key instanceof String)
+ ||
!VALID_ROW_KIND_NAMES.contains(key));
+ if (hasInvalidMappingKey) {
+ return callContext.fail(
+ throwOnFailure, "Invalid target mapping for argument
'op_mapping'.");
+ }
+ }
+
+ return Optional.of(callContext.getArgumentDataTypes());
+ }
+
+ private static String resolveOpColumnName(final CallContext callContext) {
+ return callContext
+ .getArgumentValue(1, ColumnList.class)
+ .filter(cl -> !cl.getNames().isEmpty())
+ .map(cl -> cl.getNames().get(0))
+ .orElse(DEFAULT_OP_COLUMN_NAME);
+ }
+
+ private static Set<Integer> intArrayToSet(final int[] array) {
+ return IntStream.of(array).boxed().collect(Collectors.toSet());
+ }
+
+ private ToChangelogTypeStrategy() {}
+}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java
index 4aac8bcea18..e5968ce21fa 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java
@@ -49,7 +49,9 @@ import org.apache.calcite.sql.type.SqlTypeName;
import javax.annotation.Nullable;
import java.util.AbstractList;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -114,9 +116,10 @@ public final class CallBindingCallContext extends
AbstractSqlCallContext {
@Override
public boolean isArgumentLiteral(int pos) {
final SqlNode sqlNode = adaptedArguments.get(pos);
- // Semantically a descriptor can be considered a literal,
- // however, Calcite represents them as a call
- return SqlUtil.isLiteral(sqlNode, false) || sqlNode.getKind() ==
SqlKind.DESCRIPTOR;
+ // Calcite represents DESCRIPTOR and MAP constructors as calls, not
literals
+ return SqlUtil.isLiteral(sqlNode, false)
+ || sqlNode.getKind() == SqlKind.DESCRIPTOR
+ || isLiteralMap(sqlNode);
}
@Override
@@ -136,6 +139,8 @@ public final class CallBindingCallContext extends
AbstractSqlCallContext {
}
try {
final SqlNode sqlNode = adaptedArguments.get(pos);
+ // Calcite represents DESCRIPTOR and MAP constructors as SqlCall,
not SqlLiteral.
+ // Handle them explicitly by extracting values from the call
operands.
if (sqlNode.getKind() == SqlKind.DESCRIPTOR && clazz ==
ColumnList.class) {
final List<SqlNode> columns = ((SqlCall)
sqlNode).getOperandList();
if (columns.stream()
@@ -147,6 +152,9 @@ public final class CallBindingCallContext extends
AbstractSqlCallContext {
}
return Optional.of((T) convertColumnList(columns));
}
+ if (sqlNode.getKind() == SqlKind.MAP_VALUE_CONSTRUCTOR && clazz ==
Map.class) {
+ return Optional.ofNullable((T) convertMap((SqlCall) sqlNode));
+ }
final SqlLiteral literal = SqlLiteral.unchain(sqlNode);
return Optional.ofNullable(getLiteralValueAs(literal::getValueAs,
clazz));
} catch (IllegalArgumentException e) {
@@ -354,4 +362,48 @@ public final class CallBindingCallContext extends
AbstractSqlCallContext {
.collect(Collectors.toList());
return ColumnList.of(names);
}
+
+ private static @Nullable Map<String, String> convertMap(SqlCall mapCall) {
+ final List<SqlNode> operands = mapCall.getOperandList();
+ final Map<String, String> map = new LinkedHashMap<>();
+ try {
+ for (int i = 0; i < operands.size(); i += 2) {
+ final String key =
SqlLiteral.unchain(operands.get(i)).getValueAs(String.class);
+ final String value =
+ SqlLiteral.unchain(operands.get(i +
1)).getValueAs(String.class);
+ map.put(key, value);
+ }
+ } catch (Exception e) {
+ // Not all children are literals
+ return null;
+ }
+ return map;
+ }
+
+ /** A MAP constructor is a string literal if all its key-value children
are string literals. */
+ private static boolean isLiteralMap(SqlNode sqlNode) {
+ if (sqlNode.getKind() != SqlKind.MAP_VALUE_CONSTRUCTOR) {
+ return false;
+ }
+ return ((SqlCall) sqlNode)
+
.getOperandList().stream().allMatch(CallBindingCallContext::isStringLiteral);
+ }
+
+ /**
+ * Checks if a node is a string literal. Also handles implicit CASTs
because Calcite
+ * automatically wraps string literals in MAP constructors when values
have different lengths
+ * (e.g. MAP['INSERT', 'I', 'UPDATE_AFTER', 'U'] coerces 'INSERT' to
CAST('INSERT' AS
+ * VARCHAR(12))).
+ */
+ private static boolean isStringLiteral(final SqlNode node) {
+ if (node instanceof SqlLiteral) {
+ return SqlTypeName.CHAR_TYPES.contains(((SqlLiteral)
node).getTypeName());
+ }
+ if (node.getKind() == SqlKind.CAST && node instanceof SqlCall) {
+ final SqlNode inner = ((SqlCall) node).operand(0);
+ return inner instanceof SqlLiteral
+ && SqlTypeName.CHAR_TYPES.contains(((SqlLiteral)
inner).getTypeName());
+ }
+ return false;
+ }
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java
index cd226f9b417..f1afecd4ace 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java
@@ -46,7 +46,9 @@ import org.apache.calcite.sql.SqlOperatorBinding;
import javax.annotation.Nullable;
import java.util.AbstractList;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -114,7 +116,7 @@ public final class OperatorBindingCallContext extends
AbstractSqlCallContext {
public boolean isArgumentLiteral(int pos) {
// Semantically a descriptor can be considered a literal,
// however, Calcite represents them as a call
- return binding.isOperandLiteral(pos, false) || isDescriptor(pos);
+ return binding.isOperandLiteral(pos, false) || isDescriptor(pos) ||
isMapConstructor(pos);
}
@Override
@@ -131,11 +133,14 @@ public final class OperatorBindingCallContext extends
AbstractSqlCallContext {
if (isArgumentNull(pos)) {
return Optional.empty();
}
- // Semantically a descriptor can be considered a literal,
- // Calcite represents them as a call
+ // Calcite represents DESCRIPTOR and MAP constructors as RexCall, not
RexLiteral.
+ // Handle them explicitly by extracting values from the call operands.
if (isDescriptor(pos) && clazz == ColumnList.class) {
return Optional.ofNullable((T) convertColumnList(pos));
}
+ if (isMapConstructor(pos) && clazz == Map.class) {
+ return Optional.ofNullable((T) convertMap(pos));
+ }
try {
return Optional.ofNullable(
getLiteralValueAs(
@@ -194,21 +199,23 @@ public final class OperatorBindingCallContext extends
AbstractSqlCallContext {
}
private boolean isDescriptor(int pos) {
+ return isRexCallOfKind(pos, SqlKind.DESCRIPTOR);
+ }
+
+ private boolean isMapConstructor(int pos) {
+ return isRexCallOfKind(pos, SqlKind.MAP_VALUE_CONSTRUCTOR);
+ }
+
+ private boolean isRexCallOfKind(int pos, SqlKind kind) {
if (binding instanceof RexCallBinding) {
- final List<RexNode> operands = ((RexCallBinding)
binding).operands();
- final RexNode operand = operands.get(pos);
- return operand.getKind() == SqlKind.DESCRIPTOR;
+ final RexNode operand = ((RexCallBinding)
binding).operands().get(pos);
+ return operand.getKind() == kind;
}
return false;
}
private boolean isDefault(int pos) {
- if (binding instanceof RexCallBinding) {
- final List<RexNode> operands = ((RexCallBinding)
binding).operands();
- final RexNode operand = operands.get(pos);
- return operand.getKind() == SqlKind.DEFAULT;
- }
- return false;
+ return isRexCallOfKind(pos, SqlKind.DEFAULT);
}
private @Nullable RexTableArgCall getTableArgCall(int pos) {
@@ -222,8 +229,7 @@ public final class OperatorBindingCallContext extends
AbstractSqlCallContext {
private ColumnList convertColumnList(int pos) {
if (binding instanceof RexCallBinding) {
- final List<RexNode> operands = ((RexCallBinding)
binding).operands();
- final RexCall call = (RexCall) operands.get(pos);
+ final RexCall call = (RexCall) ((RexCallBinding)
binding).operands().get(pos);
final List<String> names =
call.getOperands().stream()
.map(RexLiteral::stringValue)
@@ -233,6 +239,27 @@ public final class OperatorBindingCallContext extends
AbstractSqlCallContext {
return null;
}
+ /**
+ * Extracts a {@code Map<String, String>} from a MAP_VALUE_CONSTRUCTOR
RexCall whose operands
+ * are alternating key-value literal pairs.
+ */
+ private @Nullable Map<String, String> convertMap(int pos) {
+ if (!(binding instanceof RexCallBinding)) {
+ return null;
+ }
+ final RexCall call = (RexCall) ((RexCallBinding)
binding).operands().get(pos);
+ final List<RexNode> operands = call.getOperands();
+ final Map<String, String> map = new LinkedHashMap<>();
+ for (int i = 0; i < operands.size(); i += 2) {
+ final @Nullable String key =
RexLiteral.stringValue(operands.get(i));
+ final @Nullable String value =
RexLiteral.stringValue(operands.get(i + 1));
+ if (key != null) {
+ map.put(key, value);
+ }
+ }
+ return map;
+ }
+
private @Nullable StaticArgument getStaticArg(int pos) {
final SqlOperator operator = binding.getOperator();
if (!(operator instanceof BridgingSqlFunction)) {
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
new file mode 100644
index 00000000000..798044effa7
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import
org.apache.flink.table.planner.plan.nodes.exec.testutils.SemanticTestBase;
+import org.apache.flink.table.test.program.TableTestProgram;
+
+import java.util.List;
+
+/** Semantic tests for the built-in TO_CHANGELOG process table function. */
+public class ToChangelogSemanticTests extends SemanticTestBase {
+
+ @Override
+ protected void applyDefaultEnvironmentOptions(TableConfig config) {
+ super.applyDefaultEnvironmentOptions(config);
+ config.set(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY,
+ OptimizerConfigOptions.NonDeterministicUpdateStrategy.IGNORE);
+ }
+
+ @Override
+ public List<TableTestProgram> programs() {
+ return List.of(
+ ToChangelogTestPrograms.INSERT_ONLY_INPUT,
+ ToChangelogTestPrograms.UPDATING_INPUT,
+ ToChangelogTestPrograms.CUSTOM_OP_MAPPING,
+ ToChangelogTestPrograms.CUSTOM_OP_NAME,
+ ToChangelogTestPrograms.TABLE_API_DEFAULT,
+ ToChangelogTestPrograms.LAG_ON_UPSERT_VIA_CHANGELOG,
+ ToChangelogTestPrograms.LAG_ON_RETRACT_VIA_CHANGELOG,
+ ToChangelogTestPrograms.MISSING_PARTITION_BY,
+ ToChangelogTestPrograms.INVALID_DESCRIPTOR,
+ ToChangelogTestPrograms.INVALID_OP_MAPPING);
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
new file mode 100644
index 00000000000..22602bcf38a
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+import java.time.Instant;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/** {@link TableTestProgram} definitions for testing the built-in TO_CHANGELOG
PTF. */
+public class ToChangelogTestPrograms {
+
+ private static final SourceTestStep SIMPLE_SOURCE =
+ SourceTestStep.newBuilder("t")
+ .addSchema("id INT", "name STRING")
+ .addMode(ChangelogMode.insertOnly())
+ .producedValues(Row.ofKind(RowKind.INSERT, 1, "Alice"))
+ .build();
+
+ //
--------------------------------------------------------------------------------------------
+ // SQL tests
+ //
--------------------------------------------------------------------------------------------
+
+ public static final TableTestProgram INSERT_ONLY_INPUT =
+ TableTestProgram.of("to-changelog-insert-only", "insert-only input
produces op=INSERT")
+ .setupTableSource(
+ SourceTestStep.newBuilder("t")
+ .addSchema("id INT", "name STRING")
+ .addMode(ChangelogMode.insertOnly())
+ .producedValues(
+ Row.ofKind(RowKind.INSERT, 1,
"Alice"),
+ Row.ofKind(RowKind.INSERT, 2,
"Bob"))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema("id INT", "op STRING", "name
STRING")
+ .consumedValues("+I[1, INSERT, Alice]",
"+I[2, INSERT, Bob]")
+ .build())
+ .runSql(
+ "INSERT INTO sink SELECT * FROM TO_CHANGELOG(input
=> TABLE t PARTITION BY id)")
+ .build();
+
+ public static final TableTestProgram UPDATING_INPUT =
+ TableTestProgram.of(
+ "to-changelog-updating-input",
+ "retract input produces all op codes including
UPDATE_BEFORE")
+ .setupTableSource(
+ SourceTestStep.newBuilder("t")
+ .addSchema(
+ "name STRING PRIMARY KEY NOT
ENFORCED", "score BIGINT")
+ .addMode(ChangelogMode.all())
+ .producedValues(
+ Row.ofKind(RowKind.INSERT,
"Alice", 10L),
+ Row.ofKind(RowKind.INSERT, "Bob",
20L),
+ Row.ofKind(RowKind.UPDATE_BEFORE,
"Alice", 10L),
+ Row.ofKind(RowKind.UPDATE_AFTER,
"Alice", 30L),
+ Row.ofKind(RowKind.DELETE, "Bob",
20L))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema("name STRING", "op STRING",
"score BIGINT")
+ .consumedValues(
+ "+I[Alice, INSERT, 10]",
+ "+I[Bob, INSERT, 20]",
+ "+I[Alice, UPDATE_BEFORE, 10]",
+ "+I[Alice, UPDATE_AFTER, 30]",
+ "+I[Bob, DELETE, 20]")
+ .build())
+ .runSql(
+ "INSERT INTO sink SELECT * FROM TO_CHANGELOG(input
=> TABLE t PARTITION BY name)")
+ .build();
+
+ public static final TableTestProgram CUSTOM_OP_MAPPING =
+ TableTestProgram.of(
+ "to-changelog-custom-op-mapping",
+ "custom op_mapping maps RowKinds to user-defined
codes and drops unmapped")
+ .setupTableSource(
+ SourceTestStep.newBuilder("t")
+ .addSchema(
+ "name STRING PRIMARY KEY NOT
ENFORCED", "score BIGINT")
+ .addMode(ChangelogMode.all())
+ .producedValues(
+ Row.ofKind(RowKind.INSERT,
"Alice", 10L),
+ Row.ofKind(RowKind.INSERT, "Bob",
20L),
+ Row.ofKind(RowKind.UPDATE_BEFORE,
"Alice", 10L),
+ Row.ofKind(RowKind.UPDATE_AFTER,
"Alice", 30L),
+ Row.ofKind(RowKind.DELETE, "Bob",
20L))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema("name STRING", "op_code
STRING", "score BIGINT")
+ .consumedValues(
+ "+I[Alice, I, 10]",
+ "+I[Bob, I, 20]",
+ "+I[Alice, U, 30]")
+ .build())
+ .runSql(
+ "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+ + "input => TABLE t PARTITION BY name, "
+ + "op => DESCRIPTOR(op_code), "
+ + "op_mapping => MAP['INSERT','I',
'UPDATE_AFTER','U'])")
+ .build();
+
+ public static final TableTestProgram CUSTOM_OP_NAME =
+ TableTestProgram.of(
+ "to-changelog-custom-op-name", "custom op column
name via DESCRIPTOR")
+ .setupTableSource(
+ SourceTestStep.newBuilder("t")
+ .addSchema("id INT", "name STRING")
+ .addMode(ChangelogMode.insertOnly())
+ .producedValues(Row.ofKind(RowKind.INSERT,
1, "Alice"))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema("id INT", "operation STRING",
"name STRING")
+ .consumedValues("+I[1, INSERT, Alice]")
+ .build())
+ .runSql(
+ "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+ + "input => TABLE t PARTITION BY id, "
+ + "op => DESCRIPTOR(operation))")
+ .build();
+
+ //
--------------------------------------------------------------------------------------------
+ // Table API test
+ //
--------------------------------------------------------------------------------------------
+
+ public static final TableTestProgram TABLE_API_DEFAULT =
+ TableTestProgram.of(
+ "to-changelog-table-api-default",
+ "PartitionedTable.toChangelog() convenience
method")
+ .setupTableSource(
+ SourceTestStep.newBuilder("t")
+ .addSchema("id INT", "name STRING")
+ .addMode(ChangelogMode.insertOnly())
+ .producedValues(
+ Row.ofKind(RowKind.INSERT, 1,
"Alice"),
+ Row.ofKind(RowKind.INSERT, 2,
"Bob"))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema("id INT", "op STRING", "name
STRING")
+ .consumedValues("+I[1, INSERT, Alice]",
"+I[2, INSERT, Bob]")
+ .build())
+ .runTableApi(env ->
env.from("t").partitionBy($("id")).toChangelog(), "sink")
+ .build();
+
+ //
--------------------------------------------------------------------------------------------
+ // Use case: LAG on updating streams via TO_CHANGELOG
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * An upsert source produces INSERT, UPDATE_AFTER, and DELETE events.
TO_CHANGELOG converts them
+ * to append-only rows with explicit op codes, enabling LAG to track
status transitions -
+ * something that fails directly on upsert streams.
+ */
+ public static final TableTestProgram LAG_ON_UPSERT_VIA_CHANGELOG =
+ TableTestProgram.of(
+ "to-changelog-lag-on-upsert",
+ "enables LAG on upsert stream with INSERT,
UPDATE_AFTER, and DELETE")
+ .setupTableSource(
+ SourceTestStep.newBuilder("orders")
+ .addSchema(
+ "order_id INT NOT NULL",
+ "status STRING",
+ "ts TIMESTAMP_LTZ(3)",
+ "PRIMARY KEY (order_id) NOT
ENFORCED",
+ "WATERMARK FOR ts AS ts - INTERVAL
'1' SECOND")
+ .addMode(ChangelogMode.upsert())
+ .producedValues(
+ Row.ofKind(
+ RowKind.INSERT,
+ 1,
+ "CREATED",
+
Instant.ofEpochMilli(1000)),
+ Row.ofKind(
+ RowKind.INSERT,
+ 2,
+ "CREATED",
+
Instant.ofEpochMilli(2000)),
+ Row.ofKind(
+ RowKind.UPDATE_AFTER,
+ 1,
+ "SHIPPED",
+
Instant.ofEpochMilli(3000)),
+ Row.ofKind(
+ RowKind.UPDATE_AFTER,
+ 1,
+ "DELIVERED",
+
Instant.ofEpochMilli(4000)),
+ Row.ofKind(
+ RowKind.DELETE,
+ 2,
+ "CREATED",
+
Instant.ofEpochMilli(5000)))
+ .build())
+ .setupSql(
+ "CREATE VIEW orders_changelog AS "
+ + "SELECT order_id, op, status, ts FROM
TO_CHANGELOG("
+ + " input => TABLE orders PARTITION BY
order_id, "
+ + " op_mapping => MAP['INSERT', 'INSERT',
'UPDATE_AFTER', 'UPDATE_AFTER'])")
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema(
+ "order_id INT",
+ "op STRING",
+ "cur_status STRING",
+ "prev_status STRING")
+ .consumedValues(
+ "+I[1, INSERT, CREATED, null]",
+ "+I[2, INSERT, CREATED, null]",
+ "+I[1, UPDATE_AFTER, SHIPPED,
CREATED]",
+ "+I[1, UPDATE_AFTER, DELIVERED,
SHIPPED]")
+ .build())
+ .runSql(
+ "INSERT INTO sink "
+ + "SELECT order_id, op, status, "
+ + " LAG(status) OVER (PARTITION BY
order_id ORDER BY ts) AS prev_status "
+ + "FROM orders_changelog")
+ .build();
+
+ /**
+ * A retract source produces UPDATE_BEFORE + UPDATE_AFTER pairs and DELETE
events. TO_CHANGELOG
+ * drops UPDATE_BEFORE and DELETE via op_mapping, keeping only
forward-looking transitions for
+ * LAG to track.
+ */
+ public static final TableTestProgram LAG_ON_RETRACT_VIA_CHANGELOG =
+ TableTestProgram.of(
+ "to-changelog-lag-on-retract",
+ "enables LAG on retract stream, dropping UB and
DELETE via op_mapping")
+ .setupTableSource(
+ SourceTestStep.newBuilder("orders")
+ .addSchema(
+ "order_id INT NOT NULL",
+ "status STRING",
+ "ts TIMESTAMP_LTZ(3)",
+ "PRIMARY KEY (order_id) NOT
ENFORCED",
+ "WATERMARK FOR ts AS ts - INTERVAL
'1' SECOND")
+ .addMode(ChangelogMode.all())
+ .producedValues(
+ Row.ofKind(
+ RowKind.INSERT,
+ 1,
+ "CREATED",
+
Instant.ofEpochMilli(1000)),
+ Row.ofKind(
+ RowKind.INSERT,
+ 2,
+ "CREATED",
+
Instant.ofEpochMilli(2000)),
+ Row.ofKind(
+ RowKind.UPDATE_BEFORE,
+ 1,
+ "CREATED",
+
Instant.ofEpochMilli(3000)),
+ Row.ofKind(
+ RowKind.UPDATE_AFTER,
+ 1,
+ "SHIPPED",
+
Instant.ofEpochMilli(3000)),
+ Row.ofKind(
+ RowKind.UPDATE_BEFORE,
+ 1,
+ "SHIPPED",
+
Instant.ofEpochMilli(4000)),
+ Row.ofKind(
+ RowKind.UPDATE_AFTER,
+ 1,
+ "DELIVERED",
+
Instant.ofEpochMilli(4000)),
+ Row.ofKind(
+ RowKind.DELETE,
+ 2,
+ "CREATED",
+
Instant.ofEpochMilli(5000)))
+ .build())
+ .setupSql(
+ "CREATE VIEW orders_changelog AS "
+ + "SELECT order_id, op, status, ts FROM
TO_CHANGELOG("
+ + " input => TABLE orders PARTITION BY
order_id, "
+ + " op_mapping => MAP['INSERT', 'INSERT',
'UPDATE_AFTER', 'UPDATE_AFTER'])")
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema(
+ "order_id INT",
+ "op STRING",
+ "cur_status STRING",
+ "prev_status STRING")
+ .consumedValues(
+ "+I[1, INSERT, CREATED, null]",
+ "+I[2, INSERT, CREATED, null]",
+ "+I[1, UPDATE_AFTER, SHIPPED,
CREATED]",
+ "+I[1, UPDATE_AFTER, DELIVERED,
SHIPPED]")
+ .build())
+ .runSql(
+ "INSERT INTO sink "
+ + "SELECT order_id, op, status, "
+ + " LAG(status) OVER (PARTITION BY
order_id ORDER BY ts) AS prev_status "
+ + "FROM orders_changelog")
+ .build();
+
+ //
--------------------------------------------------------------------------------------------
+ // Error validation tests
+ //
--------------------------------------------------------------------------------------------
+
+ public static final TableTestProgram MISSING_PARTITION_BY =
+ TableTestProgram.of(
+ "to-changelog-missing-partition-by",
+ "fails when PARTITION BY is missing")
+ .setupTableSource(SIMPLE_SOURCE)
+ .runFailingSql(
+ "SELECT * FROM TO_CHANGELOG(input => TABLE t)",
+ ValidationException.class,
+ "Table argument 'input' requires a PARTITION BY
clause for parallel processing.")
+ .build();
+
+ public static final TableTestProgram INVALID_DESCRIPTOR =
+ TableTestProgram.of(
+ "to-changelog-invalid-descriptor",
+ "fails when DESCRIPTOR has multiple columns")
+ .setupTableSource(SIMPLE_SOURCE)
+ .runFailingSql(
+ "SELECT * FROM TO_CHANGELOG("
+ + "input => TABLE t PARTITION BY id, "
+ + "op => DESCRIPTOR(a, b))",
+ ValidationException.class,
+ "The descriptor for argument 'op' must contain
exactly one column name.")
+ .build();
+
+ public static final TableTestProgram INVALID_OP_MAPPING =
+ TableTestProgram.of(
+ "to-changelog-invalid-op-mapping",
+ "fails when op_mapping has invalid RowKind name")
+ .setupTableSource(SIMPLE_SOURCE)
+ .runFailingSql(
+ "SELECT * FROM TO_CHANGELOG("
+ + "input => TABLE t PARTITION BY id, "
+ + "op_mapping => MAP['INVALID_KIND',
'X'])",
+ ValidationException.class,
+ "Invalid target mapping for argument
'op_mapping'.")
+ .build();
+}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java
new file mode 100644
index 00000000000..0d4380bcb2e
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.stream.sql;
+
+import org.apache.flink.table.api.ExplainDetail;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.planner.utils.TableTestBase;
+import org.apache.flink.table.planner.utils.TableTestUtil;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+/**
+ * Plan tests for the TO_CHANGELOG built-in process table function. Uses {@link
+ * ExplainDetail#CHANGELOG_MODE} to verify changelog mode propagation through
the plan.
+ */
+public class ToChangelogTest extends TableTestBase {
+
+ private static final java.util.List<ExplainDetail> CHANGELOG_MODE =
+ Collections.singletonList(ExplainDetail.CHANGELOG_MODE);
+
+ private TableTestUtil util;
+
+ @BeforeEach
+ void setup() {
+ util = streamTestUtil(TableConfig.getDefault());
+ }
+
+ @Test
+ void testRetractSource() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE retract_source ("
+ + " id INT,"
+ + " name STRING,"
+ + " PRIMARY KEY (id) NOT ENFORCED"
+ + ") WITH ("
+ + " 'connector' = 'values',"
+ + " 'changelog-mode' = 'I,UB,UA,D'"
+ + ")");
+ util.verifyRelPlan(
+ "SELECT * FROM TO_CHANGELOG(input => TABLE retract_source
PARTITION BY id)",
+ CHANGELOG_MODE);
+ }
+
+ @Test
+ void testUpsertSource() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE upsert_source ("
+ + " id INT,"
+ + " name STRING,"
+ + " PRIMARY KEY (id) NOT ENFORCED"
+ + ") WITH ("
+ + " 'connector' = 'values',"
+ + " 'changelog-mode' = 'I,UA,D'"
+ + ")");
+ util.verifyRelPlan(
+ "SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source
PARTITION BY id)",
+ CHANGELOG_MODE);
+ }
+
+ @Test
+ void testInsertOnlySource() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE insert_only_source ("
+ + " id INT,"
+ + " name STRING"
+ + ") WITH ('connector' = 'values')");
+ util.verifyRelPlan(
+ "SELECT * FROM TO_CHANGELOG(input => TABLE insert_only_source
PARTITION BY id)",
+ CHANGELOG_MODE);
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml
new file mode 100644
index 00000000000..a3a15dc8ef4
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml
@@ -0,0 +1,57 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testInsertOnlySource">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM TO_CHANGELOG(input => TABLE insert_only_source
PARTITION BY id)]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0),
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[TO_CHANGELOG],
select=[id,op,name], rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op,
VARCHAR(2147483647) name)], changelogMode=[I])
++- Exchange(distribution=[hash[id]], changelogMode=[I])
+ +- TableSourceScan(table=[[default_catalog, default_database,
insert_only_source]], fields=[id, name], changelogMode=[I])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testRetractSource">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM TO_CHANGELOG(input => TABLE retract_source
PARTITION BY id)]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0),
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[TO_CHANGELOG],
select=[id,op,name], rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op,
VARCHAR(2147483647) name)], changelogMode=[I])
++- Exchange(distribution=[hash[id]], changelogMode=[I,UB,UA,D])
+ +- TableSourceScan(table=[[default_catalog, default_database,
retract_source]], fields=[id, name], changelogMode=[I,UB,UA,D])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testUpsertSource">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source
PARTITION BY id)]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0),
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[TO_CHANGELOG],
select=[id,op,name], rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op,
VARCHAR(2147483647) name)], changelogMode=[I])
++- Exchange(distribution=[hash[id]], changelogMode=[I,UB,UA,D])
+ +- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
+ +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
+ +- TableSourceScan(table=[[default_catalog, default_database,
upsert_source]], fields=[id, name], changelogMode=[I,UA,D])
+]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index b194d08637c..3e8d5729ee7 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -612,6 +612,11 @@ abstract class TableTestUtilBase(test: TableTestBase,
isStreamingMode: Boolean)
withQueryBlockAlias = false)
}
+ /** Java-friendly overload that accepts a list of [[ExplainDetail]]s. */
+ def verifyRelPlan(query: String, extraDetails:
java.util.List[ExplainDetail]): Unit = {
+ verifyRelPlan(query, extraDetails.asScala.toSeq: _*)
+ }
+
/**
* Verify the AST (abstract syntax tree) and the optimized rel plan for the
given INSERT
* statement.
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/BuiltInProcessTableFunction.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/BuiltInProcessTableFunction.java
new file mode 100644
index 00000000000..1ac2e2e03e7
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/BuiltInProcessTableFunction.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.ptf;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.functions.BuiltInFunctionDefinition;
+import org.apache.flink.table.functions.FunctionRequirement;
+import org.apache.flink.table.functions.ProcessTableFunction;
+import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.StateTypeStrategy;
+import org.apache.flink.table.types.inference.TypeInference;
+import org.apache.flink.table.types.inference.TypeStrategies;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Base class for built-in process table functions that are constructed from
{@link
+ * BuiltInFunctionDefinition#specialize(SpecializedContext)}.
+ *
+ * <p>Subclasses must offer a constructor that takes {@link
SpecializedContext}.
+ *
+ * <p>By default, all built-in PTFs work on internal data structures. All
argument types, output
+ * type, and state types are converted to internal representation (e.g. {@code
RowData} instead of
+ * {@code Row}, {@code MapData} instead of {@code Map}).
+ */
+@Internal
+public abstract class BuiltInProcessTableFunction<T> extends
ProcessTableFunction<T> {
+
+ private final transient BuiltInFunctionDefinition definition;
+
+ private final transient List<DataType> argumentDataTypes;
+
+ private final transient DataType internalOutputDataType;
+
+ private final transient LinkedHashMap<String, StateTypeStrategy>
internalStateStrategies;
+
+ protected BuiltInProcessTableFunction(
+ final BuiltInFunctionDefinition definition, final
SpecializedContext context) {
+ this.definition = definition;
+ final CallContext callContext = context.getCallContext();
+ final TypeInference definitionTypeInference =
+ definition.getTypeInference(callContext.getDataTypeFactory());
+
+ this.argumentDataTypes =
+ callContext.getArgumentDataTypes().stream()
+ .map(DataTypeUtils::toInternalDataType)
+ .collect(Collectors.toList());
+
+ this.internalOutputDataType =
+ callContext
+ .getOutputDataType()
+ .map(DataTypeUtils::toInternalDataType)
+ .orElseThrow(IllegalStateException::new);
+
+ this.internalStateStrategies =
+ toInternalStateStrategies(definitionTypeInference,
callContext);
+ }
+
+ // Uses deprecated typedArguments() because staticArguments() does not
apply the conversion
+ // class to TABLE arguments in TypeInferenceUtil.inferInputTypes, and we
want to use internal
+ // types in our built in functions.
+ @SuppressWarnings("deprecation")
+ @Override
+ public TypeInference getTypeInference(final DataTypeFactory typeFactory) {
+ final TypeInference definitionTypeInference =
definition.getTypeInference(typeFactory);
+
+ return TypeInference.newBuilder()
+ .typedArguments(argumentDataTypes)
+ .stateTypeStrategies(internalStateStrategies)
+
.outputTypeStrategy(TypeStrategies.explicit(internalOutputDataType))
+
.disableSystemArguments(definitionTypeInference.disableSystemArguments())
+ .build();
+ }
+
+ /** Wraps each state type strategy to produce internal data types,
preserving TTL. */
+ private static LinkedHashMap<String, StateTypeStrategy>
toInternalStateStrategies(
+ final TypeInference definitionTypeInference, final CallContext
callContext) {
+ final LinkedHashMap<String, StateTypeStrategy> result = new
LinkedHashMap<>();
+ definitionTypeInference
+ .getStateTypeStrategies()
+ .forEach(
+ (name, strategy) ->
+ result.put(
+ name,
+ StateTypeStrategy.of(
+ ctx ->
+ strategy.inferType(ctx)
+ .map(
+
DataTypeUtils
+
::toInternalDataType),
+
strategy.getTimeToLive(callContext).orElse(null))));
+ return result;
+ }
+
+ @Override
+ public Set<FunctionRequirement> getRequirements() {
+ if (definition != null) {
+ return definition.getRequirements();
+ }
+ return super.getRequirements();
+ }
+
+ @Override
+ public boolean isDeterministic() {
+ if (definition != null) {
+ return definition.isDeterministic();
+ }
+ return super.isDeterministic();
+ }
+}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
new file mode 100644
index 00000000000..75211e164f5
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.ptf;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.data.utils.ProjectedRowData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
+import org.apache.flink.table.functions.TableSemantics;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.ColumnList;
+import org.apache.flink.types.RowKind;
+
+import javax.annotation.Nullable;
+
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Runtime implementation of {@link BuiltInFunctionDefinitions#TO_CHANGELOG}.
+ *
+ * <p>Converts each input row into an INSERT-only output row with an operation
code column. The
+ * output schema is {@code [op_column, ...non_partition_columns...]} - the
framework prepends
+ * partition key columns automatically.
+ *
+ * <p>Uses {@link ProjectedRowData} for zero-copy projection of non-partition
columns and {@link
+ * JoinedRowData} to combine the op column with the projected input.
+ */
+@Internal
+public class ToChangelogFunction extends BuiltInProcessTableFunction<RowData> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Map<RowKind, String> DEFAULT_OP_MAPPING =
+ Map.of(
+ RowKind.INSERT, "INSERT",
+ RowKind.UPDATE_BEFORE, "UPDATE_BEFORE",
+ RowKind.UPDATE_AFTER, "UPDATE_AFTER",
+ RowKind.DELETE, "DELETE");
+
+ private final Map<RowKind, String> rawOpMap;
+ private final int[] nonPartitionIndices;
+
+ private transient Map<RowKind, StringData> opMap;
+ private transient ProjectedRowData projectedInput;
+ private transient GenericRowData opRow;
+ private transient JoinedRowData output;
+
+ @SuppressWarnings("unchecked")
+ public ToChangelogFunction(final SpecializedContext context) {
+ super(BuiltInFunctionDefinitions.TO_CHANGELOG, context);
+ final CallContext callContext = context.getCallContext();
+
+ final TableSemantics semantics =
+ callContext
+ .getTableSemantics(0)
+ .orElseThrow(() -> new IllegalStateException("Table
argument expected."));
+ final int[] partitionKeys = semantics.partitionByColumns();
+ final Set<Integer> partitionKeySet =
+
IntStream.of(partitionKeys).boxed().collect(Collectors.toSet());
+
+ final RowType inputType = (RowType)
semantics.dataType().getLogicalType();
+ this.nonPartitionIndices =
+ buildNonPartitionIndices(inputType.getFieldCount(),
partitionKeySet);
+
+ final Map<String, String> opMapping =
+ callContext.getArgumentValue(2, Map.class).orElse(null);
+ this.rawOpMap = buildOpMap(opMapping);
+ }
+
+ @Override
+ public void open(final FunctionContext context) throws Exception {
+ super.open(context);
+ opMap = new EnumMap<>(RowKind.class);
+ rawOpMap.forEach((kind, code) -> opMap.put(kind,
StringData.fromString(code)));
+ projectedInput = ProjectedRowData.from(nonPartitionIndices);
+ opRow = new GenericRowData(1);
+ output = new JoinedRowData();
+ }
+
+ private static int[] buildNonPartitionIndices(
+ final int fieldCount, final Set<Integer> partitionKeySet) {
+ return IntStream.range(0, fieldCount).filter(i ->
!partitionKeySet.contains(i)).toArray();
+ }
+
+ private static Map<RowKind, String> buildOpMap(@Nullable final Map<String,
String> opMapping) {
+ if (opMapping == null) {
+ return new EnumMap<>(DEFAULT_OP_MAPPING);
+ }
+ final Map<RowKind, String> map = new EnumMap<>(RowKind.class);
+ opMapping.forEach((name, code) -> map.put(RowKind.valueOf(name),
code));
+ return map;
+ }
+
+ public void eval(
+ final Context ctx,
+ final RowData input,
+ @Nullable final ColumnList op,
+ @Nullable final MapData opMapping) {
+ final StringData opCode = opMap.get(input.getRowKind());
+ if (opCode == null) {
+ return;
+ }
+
+ opRow.setField(0, opCode);
+ projectedInput.replaceRow(input);
+ collect(output.replace(opRow, projectedInput));
+ }
+}