twalthr commented on code in PR #27901:
URL: https://github.com/apache/flink/pull/27901#discussion_r3100870968


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/ChangelogModeStrategy.java:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.functions.ChangelogFunction.ChangelogContext;
+
+/**
+ * Strategy for determining the output {@link ChangelogMode} of a built-in 
process table function.
+ *
+ * <p>Similar to {@link TypeStrategy}, this is used to declare changelog 
semantics in the function
+ * definition rather than implementing the {@link
+ * org.apache.flink.table.functions.ChangelogFunction} interface directly.
+ */
+@Experimental

Review Comment:
   ```suggestion
   @Internal
   ```



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -30,9 +30,117 @@ Flink SQL provides built-in process table functions (PTFs) 
for working with chan
 
 | Function | Description |
 |:---------|:------------|
+| [FROM_CHANGELOG](#from_changelog) | Converts an append-only table with 
operation codes into a (potentially updating) dynamic table |
 | [TO_CHANGELOG](#to_changelog) | Converts a dynamic table into an append-only 
table with explicit operation codes |
 
-<!-- Placeholder for future FROM_CHANGELOG function -->
+## FROM_CHANGELOG
+
+The `FROM_CHANGELOG` PTF converts an append-only table with an explicit 
operation code column into a (potentially updating) dynamic table. Each input 
row is expected to have a string column that indicates the change operation. 
The operation column is interpreted by the engine and removed from the output.
+
+This is useful when consuming Change Data Capture (CDC) streams from systems 
like Debezium where events arrive as flat append-only records with an explicit 
operation field. It's also useful to be used in combination with the 
TO_CHANGELOG function, when converting the append-only table back into an 
updating table after doing some specific transformation to the events.
+
+Note: This version requires that your CDC data encodes updates using a full 
image (i.e. providing separate events for before and after the update). Please 
double-check whether your source provides both UPDATE_BEFORE and UPDATE_AFTER 
events. FROM_CHANGELOG is a very powerful function but might produce incorrect 
results in subsequent operations and tables, if not configured correctly.
+
+### Syntax
+
+```sql
+SELECT * FROM FROM_CHANGELOG(
+  input => TABLE source_table,
+  [op => DESCRIPTOR(op_column_name),]
+  [op_mapping => MAP[
+      'c, r', 'INSERT',
+      'ub', 'UPDATE_BEFORE',
+      'ua', 'UPDATE_AFTER',
+      'd', 'DELETE'
+  ]]
+)
+```
+
+### Parameters
+
+| Parameter    | Required | Description                                        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                             |
+|:-------------|:---------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `input`      | Yes      | The input table. Must be append-only.              
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                             |
+| `op`         | No       | A `DESCRIPTOR` with a single column name for the 
operation code column. Defaults to `op`. The column must exist in the input 
table and be of type STRING.                                                    
                                                                                
                                                                                
                                                                                
                                   |
+| `op_mapping` | No       | A `MAP<STRING, STRING>` mapping user-defined codes 
to Flink change operation names. Keys are user-defined codes (e.g., `'c'`, 
`'u'`, `'d'`), values are Flink change operation names (`INSERT`, 
`UPDATE_BEFORE`, `UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated 
codes to map multiple codes to the same operation (e.g., `'c, r'`). When 
provided, only mapped codes are forwarded - unmapped codes are dropped. Each 
change operation may appear at most once across all entries. |
+
+#### Default op_mapping
+
+When `op_mapping` is omitted, the following standard names are used. They 
allow a reverse conversion from TO_CHANGELOG by default.
+
+| Input code         | Change operation  |
+|:-------------------|:------------------|
+| `'INSERT'`         | INSERT            |
+| `'UPDATE_BEFORE'`  | UPDATE_BEFORE     |
+| `'UPDATE_AFTER'`   | UPDATE_AFTER      |
+| `'DELETE'`         | DELETE            |
+
+### Output Schema
+
+The output contains all input columns except the operation code (e.g., op) 
column, which is interpreted by Flink's SQL engine and removed. Each output row 
carries the appropriate change operation (INSERT, UPDATE_BEFORE, UPDATE_AFTER, 
or DELETE).
+
+```
+[all_input_columns_without_op]
+```
+
+### Examples
+
+#### Basic usage with standard op names
+
+```sql
+-- Input (append-only):
+-- +I[id:1, op:'INSERT',        name:'Alice']
+-- +I[id:1, op:'UPDATE_BEFORE', name:'Alice']
+-- +I[id:1, op:'UPDATE_AFTER',  name:'Alice2']
+-- +I[id:2, op:'DELETE',        name:'Bob']
+
+SELECT * FROM FROM_CHANGELOG(
+  input => TABLE cdc_stream
+)
+
+-- Output (updating table):
+-- +I[id:1, name:'Alice']
+-- -U[id:1, name:'Alice']
+-- +U[id:1, name:'Alice2']
+-- -D[id:2, name:'Bob']
+
+-- Table state after all events:
+-- | id | name   |
+-- |----|--------|
+-- | 1  | Alice2 |
+```
+
+#### Custom operation column name
+
+```sql
+-- Source schema: id INT, operation STRING, name STRING
+SELECT * FROM FROM_CHANGELOG(
+  input => TABLE cdc_stream,
+  op => DESCRIPTOR(operation)
+)
+-- The operation column named 'operation' is used instead of 'op'
+```
+
+#### Table API
+
+```java
+// Default: reads 'op' column with standard change operation names
+Table result = cdcStream.fromChangelog();

Review Comment:
   Add this to the beginning to not confuse people with DataStream API.
   ```suggestion
   Table cdcStream = ...;
   ```



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java:
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+/** {@link TableTestProgram} definitions for testing the built-in 
FROM_CHANGELOG PTF. */
+public class FromChangelogTestPrograms {
+
+    private static final SourceTestStep SIMPLE_CDC_SOURCE =
+            SourceTestStep.newBuilder("cdc_stream")
+                    .addSchema("id INT", "op STRING", "name STRING")
+                    .producedValues(Row.of(1, "INSERT", "Alice"))
+                    .build();
+
+    // 
--------------------------------------------------------------------------------------------
+    // SQL tests
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final TableTestProgram DEFAULT_OP_MAPPING =
+            TableTestProgram.of(
+                            "from-changelog-default-op-mapping",
+                            "default mapping with standard op names")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("cdc_stream")
+                                    .addSchema("id INT", "op STRING", "name 
STRING")
+                                    .producedValues(
+                                            Row.of(1, "INSERT", "Alice"),
+                                            Row.of(2, "INSERT", "Bob"),
+                                            Row.of(1, "UPDATE_BEFORE", 
"Alice"),
+                                            Row.of(1, "UPDATE_AFTER", 
"Alice2"),
+                                            Row.of(2, "DELETE", "Bob"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema(
+                                            "id INT NOT NULL",
+                                            "name STRING",
+                                            "PRIMARY KEY (id) NOT ENFORCED")

Review Comment:
   primary key is not really required, might just confuse for retract sinks. 
remove everywhere in this class?
   ```suggestion
                                               " ")
   ```



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java:
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+/** {@link TableTestProgram} definitions for testing the built-in 
FROM_CHANGELOG PTF. */
+public class FromChangelogTestPrograms {
+
+    private static final SourceTestStep SIMPLE_CDC_SOURCE =

Review Comment:
   Unused. Instead, store a `SIMPLE_CDC_SCHEMA` and reuse it across test 
programs.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/ChangelogModeStrategy.java:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference;

Review Comment:
   put it next to `BuiltInFunctionDefinitions`



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/ChangelogModeStrategy.java:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.functions.ChangelogFunction.ChangelogContext;
+
+/**
+ * Strategy for determining the output {@link ChangelogMode} of a built-in 
process table function.
+ *
+ * <p>Similar to {@link TypeStrategy}, this is used to declare changelog 
semantics in the function
+ * definition rather than implementing the {@link
+ * org.apache.flink.table.functions.ChangelogFunction} interface directly.

Review Comment:
   ```suggestion
    * ChangelogFunction} interface directly.
   ```



##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml:
##########
@@ -123,6 +123,17 @@ LogicalProject(out=[$0], rowtime=[$1])
 ProcessTableFunction(invocation=[f(TABLE(#0), DESCRIPTOR(_UTF-16LE'ts'), 
DEFAULT())], uid=[null], select=[out,rowtime], 
rowType=[RecordType(VARCHAR(2147483647) out, TIMESTAMP_LTZ(3) *ROWTIME* 
rowtime)])
 +- WatermarkAssigner(rowtime=[ts], watermark=[ts])
    +- TableSourceScan(table=[[default_catalog, default_database, 
t_watermarked]], fields=[name, score, ts])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testRetractModeWithRowSemantics">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM f(r => TABLE t)]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+ProcessTableFunction(invocation=[f(TABLE(#0), DEFAULT(), DEFAULT())], 
uid=[null], select=[name,count,mode], rowType=[RecordType(VARCHAR(2147483647) 
name, BIGINT count, VARCHAR(2147483647) mode)])
++- Values(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 42 }]])

Review Comment:
   adjust test to also print the changelog mode here



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala:
##########
@@ -1711,24 +1711,44 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
         val changelogContext =
           toPtfChangelogContext(process, inputChangelogModes, 
requiredChangelogMode)
         val changelogMode = 
changelogFunction.getChangelogMode(changelogContext)
-        if (!changelogMode.containsOnly(RowKind.INSERT)) {
-          verifyPtfTableArgsForUpdates(call)
-        }
+        verifyPtfTableArgsForUpdates(call, changelogMode)
+        toTraitSet(changelogMode)
+      case builtIn: BuiltInFunctionDefinition if 
builtIn.getChangelogModeStrategy.isPresent =>
+        val inputChangelogModes = children.map(toChangelogMode(_, None, None))
+        val changelogContext =
+          toPtfChangelogContext(process, inputChangelogModes, 
requiredChangelogMode)
+        val changelogMode =
+          
builtIn.getChangelogModeStrategy.get().inferChangelogMode(changelogContext)
+        verifyPtfTableArgsForUpdates(call, changelogMode)
         toTraitSet(changelogMode)
       case _ =>
         defaultTraitSet
     }
   }
 
-  private def verifyPtfTableArgsForUpdates(call: RexCall): Unit = {
+  /**
+   * Verifies that PTFs with upsert output (without UPDATE_BEFORE) use set 
semantics.
+   *
+   * Retract mode (with UPDATE_BEFORE) is self-describing — each update 
carries both the old and new

Review Comment:
   ```suggestion
      * Retract mode (with UPDATE_BEFORE) is self-describing — each update 
carries either the old and new
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to