This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new bcb3682605e [FLINK-39419][table] Switch TO_CHANGELOG to row semantics
with full deletes + require update before
bcb3682605e is described below
commit bcb3682605e46c23c074fe2d31f0639cc22ff212
Author: Gustavo de Morais <[email protected]>
AuthorDate: Tue Apr 14 09:38:44 2026 +0200
[FLINK-39419][table] Switch TO_CHANGELOG to row semantics with full deletes
+ require update before
This closes #27911.
---
.../docs/sql/reference/queries/changelog.md | 32 +++---
.../pyflink/table/tests/test_table_completeness.py | 1 +
.../apache/flink/table/api/PartitionedTable.java | 34 ------
.../java/org/apache/flink/table/api/Table.java | 32 ++++++
.../apache/flink/table/api/internal/TableImpl.java | 10 +-
.../functions/BuiltInFunctionDefinitions.java | 11 +-
.../strategies/ToChangelogTypeStrategy.java | 13 +--
.../inference/CallBindingCallContext.java | 18 +++-
.../exec/stream/ToChangelogSemanticTests.java | 2 +-
.../nodes/exec/stream/ToChangelogTestPrograms.java | 115 ++++++++++++---------
.../planner/plan/stream/sql/ToChangelogTest.java | 9 +-
.../planner/plan/stream/sql/ToChangelogTest.xml | 49 ++++++---
.../runtime/functions/ptf/ToChangelogFunction.java | 35 +------
13 files changed, 187 insertions(+), 174 deletions(-)
diff --git a/docs/content/docs/sql/reference/queries/changelog.md
b/docs/content/docs/sql/reference/queries/changelog.md
index 15ad336f4d0..61843ab5c2f 100644
--- a/docs/content/docs/sql/reference/queries/changelog.md
+++ b/docs/content/docs/sql/reference/queries/changelog.md
@@ -44,7 +44,7 @@ This is useful when you need to materialize changelog events
into a downstream s
```sql
SELECT * FROM TO_CHANGELOG(
- input => TABLE source_table PARTITION BY key_col,
+ input => TABLE source_table,
[op => DESCRIPTOR(op_column_name),]
[op_mapping => MAP['INSERT', 'I', 'DELETE', 'D', ...]]
)
@@ -54,7 +54,7 @@ SELECT * FROM TO_CHANGELOG(
| Parameter | Required | Description |
|:-------------|:---------|:------------|
-| `input` | Yes | The input table. Must include `PARTITION BY` for
parallel execution. Accepts insert-only, retract, and upsert tables. |
+| `input` | Yes | The input table. Accepts insert-only, retract, and
upsert tables. |
| `op` | No | A `DESCRIPTOR` with a single column name for the
operation code column. Defaults to `op`. |
| `op_mapping` | No | A `MAP<STRING, STRING>` mapping change operation
names to custom output codes. Keys can contain comma-separated names to map
multiple operations to the same code (e.g., `'INSERT, UPDATE_AFTER'`). When
provided, only mapped operations are forwarded - unmapped events are dropped.
Each change operation may appear at most once across all entries. |
@@ -74,7 +74,7 @@ When `op_mapping` is omitted, all four change operations are
mapped to their sta
The output columns are ordered as:
```
-[partition_key_columns, op_column, remaining_columns]
+[op_column, all_input_columns]
```
All output rows have `INSERT` - the table is always append-only.
@@ -85,25 +85,25 @@ All output rows have `INSERT` - the table is always
append-only.
```sql
-- Input: retract table from an aggregation
--- +I[id:1, name:'Alice', cnt:1]
--- +U[id:1, name:'Alice', cnt:2]
--- -D[id:2, name:'Bob', cnt:1]
+-- +I[name:'Alice', cnt:1]
+-- +U[name:'Alice', cnt:2]
+-- -D[name:'Bob', cnt:1]
SELECT * FROM TO_CHANGELOG(
- input => TABLE my_aggregation PARTITION BY id
+ input => TABLE my_aggregation
)
-- Output (append-only):
--- +I[id:1, op:'INSERT', name:'Alice', cnt:1]
--- +I[id:1, op:'UPDATE_AFTER', name:'Alice', cnt:2]
--- +I[id:2, op:'DELETE', name:'Bob', cnt:1]
+-- +I[op:'INSERT', name:'Alice', cnt:1]
+-- +I[op:'UPDATE_AFTER', name:'Alice', cnt:2]
+-- +I[op:'DELETE', name:'Bob', cnt:1]
```
#### Custom operation column name
```sql
SELECT * FROM TO_CHANGELOG(
- input => TABLE my_aggregation PARTITION BY id,
+ input => TABLE my_aggregation,
op => DESCRIPTOR(operation)
)
-- The op column is now named 'operation' instead of 'op'
@@ -113,7 +113,7 @@ SELECT * FROM TO_CHANGELOG(
```sql
SELECT * FROM TO_CHANGELOG(
- input => TABLE my_aggregation PARTITION BY id,
+ input => TABLE my_aggregation,
op => DESCRIPTOR(op_code),
op_mapping => MAP['INSERT', 'I', 'UPDATE_AFTER', 'U']
)
@@ -126,7 +126,7 @@ SELECT * FROM TO_CHANGELOG(
```sql
SELECT * FROM TO_CHANGELOG(
- input => TABLE my_aggregation PARTITION BY id,
+ input => TABLE my_aggregation,
op => DESCRIPTOR(deleted),
op_mapping => MAP['INSERT, UPDATE_AFTER', 'false', 'DELETE', 'true']
)
@@ -139,16 +139,16 @@ SELECT * FROM TO_CHANGELOG(
```java
// Default: adds 'op' column and supports all changelog modes
-Table result = myTable.partitionBy($("id")).toChangelog();
+Table result = myTable.toChangelog();
// With custom parameters
-Table result = myTable.partitionBy($("id")).toChangelog(
+Table result = myTable.toChangelog(
descriptor("op_code").asArgument("op"),
map("INSERT", "I", "UPDATE_AFTER", "U").asArgument("op_mapping")
);
// Deletion flag pattern: comma-separated keys map multiple change operations
to the same code
-Table result = myTable.partitionBy($("id")).toChangelog(
+Table result = myTable.toChangelog(
descriptor("deleted").asArgument("op"),
map("INSERT, UPDATE_AFTER", "false", "DELETE",
"true").asArgument("op_mapping")
);
diff --git a/flink-python/pyflink/table/tests/test_table_completeness.py
b/flink-python/pyflink/table/tests/test_table_completeness.py
index feca7e63b1a..21bb038aef3 100644
--- a/flink-python/pyflink/table/tests/test_table_completeness.py
+++ b/flink-python/pyflink/table/tests/test_table_completeness.py
@@ -42,6 +42,7 @@ class
TableAPICompletenessTests(PythonAPICompletenessTestCase, PyFlinkTestCase):
'asArgument',
'process',
'partitionBy',
+ 'toChangelog',
}
@classmethod
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
index 7db750a296d..32180e51fb9 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.api;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.annotation.ArgumentTrait;
-import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.functions.ProcessTableFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
@@ -139,37 +138,4 @@ public interface PartitionedTable {
* @see ProcessTableFunction
*/
Table process(Class<? extends UserDefinedFunction> function, Object...
arguments);
-
- /**
- * Converts this dynamic table into an append-only table with an explicit
operation code column
- * using the built-in {@code TO_CHANGELOG} process table function.
- *
- * <p>Each input row - regardless of its original change operation - is
emitted as an
- * INSERT-only row with a string {@code "op"} column indicating the
original operation (INSERT,
- * UPDATE_AFTER, DELETE, etc.).
- *
- * <p>Optional arguments can be passed using named expressions:
- *
- * <pre>{@code
- * // Default: adds 'op' column and supports all changelog modes
- * table.partitionBy($("id")).toChangelog();
- *
- * // Custom op column name and mapping
- * table.partitionBy($("id")).toChangelog(
- * descriptor("op_code").asArgument("op"),
- * map("INSERT", "I", "UPDATE_AFTER", "U").asArgument("op_mapping")
- * );
- *
- * // Deletion flag pattern: comma-separated keys map multiple change
operations to the same code
- * table.partitionBy($("id")).toChangelog(
- * descriptor("deleted").asArgument("op"),
- * map("INSERT, UPDATE_AFTER", "false", "DELETE",
"true").asArgument("op_mapping")
- * );
- * }</pre>
- *
- * @param arguments optional named arguments for {@code op} and {@code
op_mapping}
- * @return an append-only {@link Table} with an {@code op} column
prepended to the non-partition
- * columns
- */
- Table toChangelog(Expression... arguments);
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
index 2471b91227d..ef1785aecdd 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
@@ -1422,4 +1422,36 @@ public interface Table extends Explainable<Table>,
Executable {
* @see ProcessTableFunction
*/
Table process(Class<? extends UserDefinedFunction> function, Object...
arguments);
+
+ /**
+ * Converts this dynamic table into an append-only table with an explicit
operation code column
+ * using the built-in {@code TO_CHANGELOG} process table function.
+ *
+ * <p>Each input row - regardless of its original change operation - is
emitted as an
+ * INSERT-only row with a string {@code "op"} column indicating the
original operation (INSERT,
+ * UPDATE_AFTER, DELETE, etc.).
+ *
+ * <p>Optional arguments can be passed using named expressions:
+ *
+ * <pre>{@code
+ * // Default: adds 'op' column and supports all changelog modes
+ * table.toChangelog();
+ *
+ * // Custom op column name and mapping
+ * table.toChangelog(
+ * descriptor("op_code").asArgument("op"),
+ * map("INSERT", "I", "UPDATE_AFTER", "U").asArgument("op_mapping")
+ * );
+ *
+ * // Deletion flag pattern: comma-separated keys map multiple change
operations to the same code
+ * table.toChangelog(
+ * descriptor("deleted").asArgument("op"),
+ * map("INSERT, UPDATE_AFTER", "false", "DELETE",
"true").asArgument("op_mapping")
+ * );
+ * }</pre>
+ *
+ * @param arguments optional named arguments for {@code op} and {@code
op_mapping}
+ * @return an append-only {@link Table} with an {@code op} column
prepended to the input columns
+ */
+ Table toChangelog(Expression... arguments);
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
index 335f28f2f8e..73d70a83dfd 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
@@ -514,6 +514,11 @@ public class TableImpl implements Table {
function, unionTableAndArguments(operationTree,
tableEnvironment, arguments));
}
+ @Override
+ public Table toChangelog(Expression... arguments) {
+ return process(BuiltInFunctionDefinitions.TO_CHANGELOG.getName(),
(Object[]) arguments);
+ }
+
private TablePipeline insertInto(
ContextResolvedTable contextResolvedTable,
@Nullable InsertConflictStrategy conflictStrategy,
@@ -901,11 +906,6 @@ public class TableImpl implements Table {
createPartitionQueryOperation(),
table.tableEnvironment, arguments));
}
- @Override
- public Table toChangelog(Expression... arguments) {
- return process(BuiltInFunctionDefinitions.TO_CHANGELOG.getName(),
(Object[]) arguments);
- }
-
private QueryOperation createPartitionQueryOperation() {
return table.operationTreeBuilder.partition(partitionKeys,
table.operationTree);
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index e8783ba51c8..0f2f35a056a 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -782,15 +782,22 @@ public final class BuiltInFunctionDefinitions {
.name("TO_CHANGELOG")
.kind(PROCESS_TABLE)
.staticArguments(
+ // Row semantics (no PARTITION BY). Accepts
updating
+ // inputs. The planner inserts ChangelogNormalize
for
+ // upsert sources to produce UPDATE_BEFORE and full
+ // DELETE rows.
StaticArgument.table(
"input",
Row.class,
false,
EnumSet.of(
StaticArgumentTrait.TABLE,
-
StaticArgumentTrait.SET_SEMANTIC_TABLE,
+
StaticArgumentTrait.ROW_SEMANTIC_TABLE,
StaticArgumentTrait.SUPPORT_UPDATES,
-
StaticArgumentTrait.REQUIRE_UPDATE_BEFORE)),
+
StaticArgumentTrait.REQUIRE_UPDATE_BEFORE,
+ // Not strictly necessary but
explicitly state that
+ // we require full deletes.
+
StaticArgumentTrait.REQUIRE_FULL_DELETE)),
StaticArgument.scalar("op",
DataTypes.DESCRIPTOR(), true),
StaticArgument.scalar(
"op_mapping",
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
index d13d52a49c7..c910885d69e 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
@@ -40,8 +40,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
/** Type strategies for the {@code TO_CHANGELOG} process table function. */
@Internal
@@ -98,15 +96,10 @@ public final class ToChangelogTypeStrategy {
final String opColumnName = resolveOpColumnName(callContext);
final List<Field> inputFields =
DataType.getFields(semantics.dataType());
- final Set<Integer> partitionKeys =
intArrayToSet(semantics.partitionByColumns());
final List<Field> outputFields = new ArrayList<>();
outputFields.add(DataTypes.FIELD(opColumnName,
DataTypes.STRING()));
- for (int i = 0; i < inputFields.size(); i++) {
- if (!partitionKeys.contains(i)) {
- outputFields.add(inputFields.get(i));
- }
- }
+ outputFields.addAll(inputFields);
return Optional.of(DataTypes.ROW(outputFields).notNull());
};
@@ -199,9 +192,5 @@ public final class ToChangelogTypeStrategy {
.orElse(DEFAULT_OP_COLUMN_NAME);
}
- private static Set<Integer> intArrayToSet(final int[] array) {
- return IntStream.of(array).boxed().collect(Collectors.toSet());
- }
-
private ToChangelogTypeStrategy() {}
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java
index e5968ce21fa..01c569a19ef 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java
@@ -368,9 +368,11 @@ public final class CallBindingCallContext extends
AbstractSqlCallContext {
final Map<String, String> map = new LinkedHashMap<>();
try {
for (int i = 0; i < operands.size(); i += 2) {
- final String key =
SqlLiteral.unchain(operands.get(i)).getValueAs(String.class);
+ final String key =
+
SqlLiteral.unchain(unwrapCast(operands.get(i))).getValueAs(String.class);
final String value =
- SqlLiteral.unchain(operands.get(i +
1)).getValueAs(String.class);
+ SqlLiteral.unchain(unwrapCast(operands.get(i + 1)))
+ .getValueAs(String.class);
map.put(key, value);
}
} catch (Exception e) {
@@ -380,6 +382,18 @@ public final class CallBindingCallContext extends
AbstractSqlCallContext {
return map;
}
+ /** Unwraps implicit CHAR-type CASTs added by Calcite for length
normalization. */
+ private static SqlNode unwrapCast(final SqlNode node) {
+ if (node.getKind() == SqlKind.CAST && node instanceof SqlCall) {
+ final SqlNode inner = ((SqlCall) node).operand(0);
+ if (inner instanceof SqlLiteral
+ && SqlTypeName.CHAR_TYPES.contains(((SqlLiteral)
inner).getTypeName())) {
+ return inner;
+ }
+ }
+ return node;
+ }
+
/** A MAP constructor is a string literal if all its key-value children
are string literals. */
private static boolean isLiteralMap(SqlNode sqlNode) {
if (sqlNode.getKind() != SqlKind.MAP_VALUE_CONSTRUCTOR) {
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
index 706f7abd1d3..da3c9270eba 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
@@ -41,13 +41,13 @@ public class ToChangelogSemanticTests extends
SemanticTestBase {
return List.of(
ToChangelogTestPrograms.INSERT_ONLY_INPUT,
ToChangelogTestPrograms.UPDATING_INPUT,
+ ToChangelogTestPrograms.UPSERT_INPUT,
ToChangelogTestPrograms.CUSTOM_OP_MAPPING,
ToChangelogTestPrograms.CUSTOM_OP_NAME,
ToChangelogTestPrograms.TABLE_API_DEFAULT,
ToChangelogTestPrograms.LAG_ON_UPSERT_VIA_CHANGELOG,
ToChangelogTestPrograms.LAG_ON_RETRACT_VIA_CHANGELOG,
ToChangelogTestPrograms.DELETION_FLAG,
- ToChangelogTestPrograms.MISSING_PARTITION_BY,
ToChangelogTestPrograms.INVALID_DESCRIPTOR,
ToChangelogTestPrograms.INVALID_OP_MAPPING,
ToChangelogTestPrograms.DUPLICATE_ROW_KIND);
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
index bfc67097112..4056b652a79 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
@@ -28,8 +28,6 @@ import org.apache.flink.types.RowKind;
import java.time.Instant;
-import static org.apache.flink.table.api.Expressions.$;
-
/** {@link TableTestProgram} definitions for testing the built-in TO_CHANGELOG
PTF. */
public class ToChangelogTestPrograms {
@@ -56,11 +54,10 @@ public class ToChangelogTestPrograms {
.build())
.setupTableSink(
SinkTestStep.newBuilder("sink")
- .addSchema("id INT", "op STRING", "name
STRING")
- .consumedValues("+I[1, INSERT, Alice]",
"+I[2, INSERT, Bob]")
+ .addSchema("op STRING", "id INT", "name
STRING")
+ .consumedValues("+I[INSERT, 1, Alice]",
"+I[INSERT, 2, Bob]")
.build())
- .runSql(
- "INSERT INTO sink SELECT * FROM TO_CHANGELOG(input
=> TABLE t PARTITION BY id)")
+ .runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input
=> TABLE t)")
.build();
public static final TableTestProgram UPDATING_INPUT =
@@ -81,16 +78,45 @@ public class ToChangelogTestPrograms {
.build())
.setupTableSink(
SinkTestStep.newBuilder("sink")
- .addSchema("name STRING", "op STRING",
"score BIGINT")
+ .addSchema("op STRING", "name STRING",
"score BIGINT")
.consumedValues(
- "+I[Alice, INSERT, 10]",
- "+I[Bob, INSERT, 20]",
- "+I[Alice, UPDATE_BEFORE, 10]",
- "+I[Alice, UPDATE_AFTER, 30]",
- "+I[Bob, DELETE, 20]")
+ "+I[INSERT, Alice, 10]",
+ "+I[INSERT, Bob, 20]",
+ "+I[UPDATE_BEFORE, Alice, 10]",
+ "+I[UPDATE_AFTER, Alice, 30]",
+ "+I[DELETE, Bob, 20]")
.build())
- .runSql(
- "INSERT INTO sink SELECT * FROM TO_CHANGELOG(input
=> TABLE t PARTITION BY name)")
+ .runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input
=> TABLE t)")
+ .build();
+
+ public static final TableTestProgram UPSERT_INPUT =
+ TableTestProgram.of(
+ "to-changelog-upsert-input",
+ "upsert input gets ChangelogNormalize for
UPDATE_BEFORE and full deletes")
+ .setupTableSource(
+ SourceTestStep.newBuilder("t")
+ .addSchema(
+ "name STRING PRIMARY KEY NOT
ENFORCED", "score BIGINT")
+ .addMode(ChangelogMode.upsert())
+ .producedValues(
+ Row.ofKind(RowKind.INSERT,
"Alice", 10L),
+ Row.ofKind(RowKind.INSERT, "Bob",
20L),
+ Row.ofKind(RowKind.UPDATE_AFTER,
"Alice", 30L),
+ // Key-only delete:
ChangelogNormalize fills
+ // in the full row from state
+ Row.ofKind(RowKind.DELETE, "Bob",
null))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema("op STRING", "name STRING",
"score BIGINT")
+ .consumedValues(
+ "+I[INSERT, Alice, 10]",
+ "+I[INSERT, Bob, 20]",
+ "+I[UPDATE_BEFORE, Alice, 10]",
+ "+I[UPDATE_AFTER, Alice, 30]",
+ "+I[DELETE, Bob, 20]")
+ .build())
+ .runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input
=> TABLE t)")
.build();
public static final TableTestProgram CUSTOM_OP_MAPPING =
@@ -111,15 +137,15 @@ public class ToChangelogTestPrograms {
.build())
.setupTableSink(
SinkTestStep.newBuilder("sink")
- .addSchema("name STRING", "op_code
STRING", "score BIGINT")
+ .addSchema("op_code STRING", "name
STRING", "score BIGINT")
.consumedValues(
- "+I[Alice, I, 10]",
- "+I[Bob, I, 20]",
- "+I[Alice, U, 30]")
+ "+I[I, Alice, 10]",
+ "+I[I, Bob, 20]",
+ "+I[U, Alice, 30]")
.build())
.runSql(
"INSERT INTO sink SELECT * FROM TO_CHANGELOG("
- + "input => TABLE t PARTITION BY name, "
+ + "input => TABLE t, "
+ "op => DESCRIPTOR(op_code), "
+ "op_mapping => MAP['INSERT','I',
'UPDATE_AFTER','U'])")
.build();
@@ -135,12 +161,12 @@ public class ToChangelogTestPrograms {
.build())
.setupTableSink(
SinkTestStep.newBuilder("sink")
- .addSchema("id INT", "operation STRING",
"name STRING")
- .consumedValues("+I[1, INSERT, Alice]")
+ .addSchema("operation STRING", "id INT",
"name STRING")
+ .consumedValues("+I[INSERT, 1, Alice]")
.build())
.runSql(
"INSERT INTO sink SELECT * FROM TO_CHANGELOG("
- + "input => TABLE t PARTITION BY id, "
+ + "input => TABLE t, "
+ "op => DESCRIPTOR(operation))")
.build();
@@ -151,7 +177,7 @@ public class ToChangelogTestPrograms {
public static final TableTestProgram TABLE_API_DEFAULT =
TableTestProgram.of(
"to-changelog-table-api-default",
- "PartitionedTable.toChangelog() convenience
method")
+ "Table.toChangelog() convenience method")
.setupTableSource(
SourceTestStep.newBuilder("t")
.addSchema("id INT", "name STRING")
@@ -162,10 +188,10 @@ public class ToChangelogTestPrograms {
.build())
.setupTableSink(
SinkTestStep.newBuilder("sink")
- .addSchema("id INT", "op STRING", "name
STRING")
- .consumedValues("+I[1, INSERT, Alice]",
"+I[2, INSERT, Bob]")
+ .addSchema("op STRING", "id INT", "name
STRING")
+ .consumedValues("+I[INSERT, 1, Alice]",
"+I[INSERT, 2, Bob]")
.build())
- .runTableApi(env ->
env.from("t").partitionBy($("id")).toChangelog(), "sink")
+ .runTableApi(env -> env.from("t").toChangelog(), "sink")
.build();
//
--------------------------------------------------------------------------------------------
@@ -219,8 +245,8 @@ public class ToChangelogTestPrograms {
.build())
.setupSql(
"CREATE VIEW orders_changelog AS "
- + "SELECT order_id, op, status, ts FROM
TO_CHANGELOG("
- + " input => TABLE orders PARTITION BY
order_id, "
+ + "SELECT op, order_id, status, ts FROM
TO_CHANGELOG("
+ + " input => TABLE orders, "
+ " op_mapping => MAP['INSERT', 'INSERT',
'UPDATE_AFTER', 'UPDATE_AFTER'])")
.setupTableSink(
SinkTestStep.newBuilder("sink")
@@ -299,8 +325,8 @@ public class ToChangelogTestPrograms {
.build())
.setupSql(
"CREATE VIEW orders_changelog AS "
- + "SELECT order_id, op, status, ts FROM
TO_CHANGELOG("
- + " input => TABLE orders PARTITION BY
order_id, "
+ + "SELECT op, order_id, status, ts FROM
TO_CHANGELOG("
+ + " input => TABLE orders, "
+ " op_mapping => MAP['INSERT', 'INSERT',
'UPDATE_AFTER', 'UPDATE_AFTER'])")
.setupTableSink(
SinkTestStep.newBuilder("sink")
@@ -348,16 +374,16 @@ public class ToChangelogTestPrograms {
.build())
.setupTableSink(
SinkTestStep.newBuilder("sink")
- .addSchema("name STRING", "deleted
STRING", "score BIGINT")
+ .addSchema("deleted STRING", "name
STRING", "score BIGINT")
.consumedValues(
- "+I[Alice, false, 10]",
- "+I[Bob, false, 20]",
- "+I[Alice, false, 30]",
- "+I[Bob, true, 20]")
+ "+I[false, Alice, 10]",
+ "+I[false, Bob, 20]",
+ "+I[false, Alice, 30]",
+ "+I[true, Bob, 20]")
.build())
.runSql(
"INSERT INTO sink SELECT * FROM TO_CHANGELOG("
- + "input => TABLE t PARTITION BY name, "
+ + "input => TABLE t, "
+ "op => DESCRIPTOR(deleted), "
+ "op_mapping => MAP['INSERT,
UPDATE_AFTER', 'false', 'DELETE', 'true'])")
.build();
@@ -366,17 +392,6 @@ public class ToChangelogTestPrograms {
// Error validation tests
//
--------------------------------------------------------------------------------------------
- public static final TableTestProgram MISSING_PARTITION_BY =
- TableTestProgram.of(
- "to-changelog-missing-partition-by",
- "fails when PARTITION BY is missing")
- .setupTableSource(SIMPLE_SOURCE)
- .runFailingSql(
- "SELECT * FROM TO_CHANGELOG(input => TABLE t)",
- ValidationException.class,
- "Table argument 'input' requires a PARTITION BY
clause for parallel processing.")
- .build();
-
public static final TableTestProgram INVALID_DESCRIPTOR =
TableTestProgram.of(
"to-changelog-invalid-descriptor",
@@ -384,7 +399,7 @@ public class ToChangelogTestPrograms {
.setupTableSource(SIMPLE_SOURCE)
.runFailingSql(
"SELECT * FROM TO_CHANGELOG("
- + "input => TABLE t PARTITION BY id, "
+ + "input => TABLE t, "
+ "op => DESCRIPTOR(a, b))",
ValidationException.class,
"The descriptor for argument 'op' must contain
exactly one column name.")
@@ -397,7 +412,7 @@ public class ToChangelogTestPrograms {
.setupTableSource(SIMPLE_SOURCE)
.runFailingSql(
"SELECT * FROM TO_CHANGELOG("
- + "input => TABLE t PARTITION BY id, "
+ + "input => TABLE t, "
+ "op_mapping => MAP['INVALID_KIND',
'X'])",
ValidationException.class,
"Unknown change operation: 'INVALID_KIND'")
@@ -410,7 +425,7 @@ public class ToChangelogTestPrograms {
.setupTableSource(SIMPLE_SOURCE)
.runFailingSql(
"SELECT * FROM TO_CHANGELOG("
- + "input => TABLE t PARTITION BY id, "
+ + "input => TABLE t, "
+ "op_mapping => MAP['INSERT, DELETE',
'A', 'DELETE', 'B'])",
ValidationException.class,
"Duplicate change operation: 'DELETE'")
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java
index 0d4380bcb2e..ee9576b2e75 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java
@@ -57,8 +57,7 @@ public class ToChangelogTest extends TableTestBase {
+ " 'changelog-mode' = 'I,UB,UA,D'"
+ ")");
util.verifyRelPlan(
- "SELECT * FROM TO_CHANGELOG(input => TABLE retract_source
PARTITION BY id)",
- CHANGELOG_MODE);
+ "SELECT * FROM TO_CHANGELOG(input => TABLE retract_source)",
CHANGELOG_MODE);
}
@Test
@@ -74,8 +73,7 @@ public class ToChangelogTest extends TableTestBase {
+ " 'changelog-mode' = 'I,UA,D'"
+ ")");
util.verifyRelPlan(
- "SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source
PARTITION BY id)",
- CHANGELOG_MODE);
+ "SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source)",
CHANGELOG_MODE);
}
@Test
@@ -87,7 +85,6 @@ public class ToChangelogTest extends TableTestBase {
+ " name STRING"
+ ") WITH ('connector' = 'values')");
util.verifyRelPlan(
- "SELECT * FROM TO_CHANGELOG(input => TABLE insert_only_source
PARTITION BY id)",
- CHANGELOG_MODE);
+ "SELECT * FROM TO_CHANGELOG(input => TABLE
insert_only_source)", CHANGELOG_MODE);
}
}
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml
index a3a15dc8ef4..91c73153bee 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml
@@ -18,39 +18,60 @@ limitations under the License.
<Root>
<TestCase name="testInsertOnlySource">
<Resource name="sql">
- <![CDATA[SELECT * FROM TO_CHANGELOG(input => TABLE insert_only_source
PARTITION BY id)]]>
+ <![CDATA[SELECT * FROM TO_CHANGELOG(input => TABLE insert_only_source)]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(op=[$0], id=[$1], name=[$2])
++- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(),
DEFAULT(), DEFAULT(), DEFAULT())], rowType=[RecordType(VARCHAR(2147483647) op,
INTEGER id, VARCHAR(2147483647) name)])
+ +- LogicalProject(id=[$0], name=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
insert_only_source]])
+]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0),
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[TO_CHANGELOG],
select=[id,op,name], rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op,
VARCHAR(2147483647) name)], changelogMode=[I])
-+- Exchange(distribution=[hash[id]], changelogMode=[I])
- +- TableSourceScan(table=[[default_catalog, default_database,
insert_only_source]], fields=[id, name], changelogMode=[I])
+ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(),
DEFAULT(), DEFAULT())], uid=[null], select=[op,id,name],
rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647)
name)], changelogMode=[I])
++- TableSourceScan(table=[[default_catalog, default_database,
insert_only_source]], fields=[id, name], changelogMode=[I])
]]>
</Resource>
</TestCase>
<TestCase name="testRetractSource">
<Resource name="sql">
- <![CDATA[SELECT * FROM TO_CHANGELOG(input => TABLE retract_source
PARTITION BY id)]]>
+ <![CDATA[SELECT * FROM TO_CHANGELOG(input => TABLE retract_source)]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(op=[$0], id=[$1], name=[$2])
++- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(),
DEFAULT(), DEFAULT(), DEFAULT())], rowType=[RecordType(VARCHAR(2147483647) op,
INTEGER id, VARCHAR(2147483647) name)])
+ +- LogicalProject(id=[$0], name=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
retract_source]])
+]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0),
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[TO_CHANGELOG],
select=[id,op,name], rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op,
VARCHAR(2147483647) name)], changelogMode=[I])
-+- Exchange(distribution=[hash[id]], changelogMode=[I,UB,UA,D])
- +- TableSourceScan(table=[[default_catalog, default_database,
retract_source]], fields=[id, name], changelogMode=[I,UB,UA,D])
+ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(),
DEFAULT(), DEFAULT())], uid=[null], select=[op,id,name],
rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647)
name)], changelogMode=[I])
++- TableSourceScan(table=[[default_catalog, default_database,
retract_source]], fields=[id, name], changelogMode=[I,UB,UA,D])
]]>
</Resource>
</TestCase>
<TestCase name="testUpsertSource">
<Resource name="sql">
- <![CDATA[SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source
PARTITION BY id)]]>
+ <![CDATA[SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source)]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(op=[$0], id=[$1], name=[$2])
++- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(),
DEFAULT(), DEFAULT(), DEFAULT())], rowType=[RecordType(VARCHAR(2147483647) op,
INTEGER id, VARCHAR(2147483647) name)])
+ +- LogicalProject(id=[$0], name=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
upsert_source]])
+]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0),
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[TO_CHANGELOG],
select=[id,op,name], rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op,
VARCHAR(2147483647) name)], changelogMode=[I])
-+- Exchange(distribution=[hash[id]], changelogMode=[I,UB,UA,D])
- +- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
- +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
- +- TableSourceScan(table=[[default_catalog, default_database,
upsert_source]], fields=[id, name], changelogMode=[I,UA,D])
+ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(),
DEFAULT(), DEFAULT())], uid=[null], select=[op,id,name],
rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647)
name)], changelogMode=[I])
++- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
+ +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
+ +- TableSourceScan(table=[[default_catalog, default_database,
upsert_source]], fields=[id, name], changelogMode=[I,UA,D])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
index f16ea33375f..247f565228b 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
@@ -24,13 +24,10 @@ import org.apache.flink.table.data.MapData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.utils.JoinedRowData;
-import org.apache.flink.table.data.utils.ProjectedRowData;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
-import org.apache.flink.table.functions.TableSemantics;
import org.apache.flink.table.types.inference.CallContext;
-import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.ColumnList;
import org.apache.flink.types.RowKind;
@@ -38,19 +35,14 @@ import javax.annotation.Nullable;
import java.util.EnumMap;
import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
/**
* Runtime implementation of {@link BuiltInFunctionDefinitions#TO_CHANGELOG}.
*
* <p>Converts each input row into an INSERT-only output row with an operation
code column. The
- * output schema is {@code [op_column, ...non_partition_columns...]} - the
framework prepends
- * partition key columns automatically.
+ * output schema is {@code [op_column, ...all_input_columns...]}.
*
- * <p>Uses {@link ProjectedRowData} for zero-copy projection of non-partition
columns and {@link
- * JoinedRowData} to combine the op column with the projected input.
+ * <p>Uses {@link JoinedRowData} to combine the op column with the full input
row.
*/
@Internal
public class ToChangelogFunction extends BuiltInProcessTableFunction<RowData> {
@@ -65,10 +57,8 @@ public class ToChangelogFunction extends
BuiltInProcessTableFunction<RowData> {
RowKind.DELETE, "DELETE");
private final Map<RowKind, String> rawOpMap;
- private final int[] nonPartitionIndices;
private transient Map<RowKind, StringData> opMap;
- private transient ProjectedRowData projectedInput;
private transient GenericRowData opRow;
private transient JoinedRowData output;
@@ -77,18 +67,6 @@ public class ToChangelogFunction extends
BuiltInProcessTableFunction<RowData> {
super(BuiltInFunctionDefinitions.TO_CHANGELOG, context);
final CallContext callContext = context.getCallContext();
- final TableSemantics semantics =
- callContext
- .getTableSemantics(0)
- .orElseThrow(() -> new IllegalStateException("Table
argument expected."));
- final int[] partitionKeys = semantics.partitionByColumns();
- final Set<Integer> partitionKeySet =
-
IntStream.of(partitionKeys).boxed().collect(Collectors.toSet());
-
- final RowType inputType = (RowType)
semantics.dataType().getLogicalType();
- this.nonPartitionIndices =
- buildNonPartitionIndices(inputType.getFieldCount(),
partitionKeySet);
-
final Map<String, String> opMapping =
callContext.getArgumentValue(2, Map.class).orElse(null);
this.rawOpMap = buildOpMap(opMapping);
@@ -99,16 +77,10 @@ public class ToChangelogFunction extends
BuiltInProcessTableFunction<RowData> {
super.open(context);
opMap = new EnumMap<>(RowKind.class);
rawOpMap.forEach((kind, code) -> opMap.put(kind,
StringData.fromString(code)));
- projectedInput = ProjectedRowData.from(nonPartitionIndices);
opRow = new GenericRowData(1);
output = new JoinedRowData();
}
- private static int[] buildNonPartitionIndices(
- final int fieldCount, final Set<Integer> partitionKeySet) {
- return IntStream.range(0, fieldCount).filter(i ->
!partitionKeySet.contains(i)).toArray();
- }
-
/**
* Builds a RowKind-to-output-code map. Keys may be comma-separated (e.g.,
"INSERT,
* UPDATE_AFTER") to map multiple RowKinds to the same output code.
@@ -138,7 +110,6 @@ public class ToChangelogFunction extends
BuiltInProcessTableFunction<RowData> {
}
opRow.setField(0, opCode);
- projectedInput.replaceRow(input);
- collect(output.replace(opRow, projectedInput));
+ collect(output.replace(opRow, input));
}
}