This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 9edb0f5bc43 [FLINK-39479][python] Add fromChangelog() to Python Table 
API
9edb0f5bc43 is described below

commit 9edb0f5bc439519c3128af5670e4c4c3af1b4910
Author: Ramin Gharib <[email protected]>
AuthorDate: Tue Apr 21 13:57:18 2026 +0200

    [FLINK-39479][python] Add fromChangelog() to Python Table API
    
    This closes #27975.
---
 flink-python/pyflink/table/table.py                | 42 ++++++++++++++++++++--
 .../pyflink/table/tests/test_table_completeness.py |  1 -
 .../java/org/apache/flink/table/api/Table.java     | 36 ++++++++++++-------
 3 files changed, 62 insertions(+), 17 deletions(-)

diff --git a/flink-python/pyflink/table/table.py 
b/flink-python/pyflink/table/table.py
index b76918083e5..ac54022a91a 100644
--- a/flink-python/pyflink/table/table.py
+++ b/flink-python/pyflink/table/table.py
@@ -1201,14 +1201,14 @@ class Table(object):
 
             >>> from pyflink.table.expressions import descriptor, map_
             >>> # Default: adds 'op' column with standard change operation 
names
-            >>> table.to_changelog()
+            >>> result = table.to_changelog()
             >>> # Custom op column name and mapping
-            >>> table.to_changelog(
+            >>> result = table.to_changelog(
             ...     descriptor("op_code").as_argument("op"),
             ...     map_("INSERT", "I", "UPDATE_AFTER", 
"U").as_argument("op_mapping")
             ... )
             >>> # Deletion flag pattern
-            >>> table.to_changelog(
+            >>> result = table.to_changelog(
             ...     descriptor("deleted").as_argument("op"),
             ...     map_("INSERT, UPDATE_AFTER", "false",
             ...          "DELETE", "true").as_argument("op_mapping")
@@ -1220,6 +1220,42 @@ class Table(object):
         """
         return 
Table(self._j_table.toChangelog(to_expression_jarray(arguments)), self._t_env)
 
+    def from_changelog(self, *arguments: Expression) -> 'Table':
+        """
+        Converts this append-only table with an explicit operation code column 
into a
+        (potentially updating) dynamic table. Each input row is expected to 
have a string
+        column that indicates the change operation. The operation column is 
interpreted by
+        the engine and removed from the output.
+
+        The operation code column defaults to ``op``. By default, the codes 
``INSERT``,
+        ``UPDATE_BEFORE``, ``UPDATE_AFTER``, and ``DELETE`` are recognized; 
pass
+        ``op_mapping`` to use custom codes.
+
+        Example:
+        ::
+
+            >>> from pyflink.table.expressions import descriptor, map_
+            >>> # Default: reads 'op' column with standard change operation 
names
+            >>> result = cdc_stream.from_changelog()
+            >>> # With custom op column name
+            >>> result = cdc_stream.from_changelog(
+            ...     descriptor("operation").as_argument("op")
+            ... )
+            >>> # With custom op_mapping
+            >>> result = cdc_stream.from_changelog(
+            ...     descriptor("op").as_argument("op"),
+            ...     map_("c, r", "INSERT",
+            ...          "ub", "UPDATE_BEFORE",
+            ...          "ua", "UPDATE_AFTER",
+            ...          "d", "DELETE").as_argument("op_mapping")
+            ... )
+
+        :param arguments: Optional named arguments for ``op`` and 
``op_mapping``.
+        :return: A dynamic :class:`~pyflink.table.Table` with the ``op`` 
column removed and
+                 proper change operation semantics.
+        """
+        return 
Table(self._j_table.fromChangelog(to_expression_jarray(arguments)), self._t_env)
+
 
 @PublicEvolving()
 class GroupedTable(object):
diff --git a/flink-python/pyflink/table/tests/test_table_completeness.py 
b/flink-python/pyflink/table/tests/test_table_completeness.py
index 1f2b920474f..feca7e63b1a 100644
--- a/flink-python/pyflink/table/tests/test_table_completeness.py
+++ b/flink-python/pyflink/table/tests/test_table_completeness.py
@@ -42,7 +42,6 @@ class 
TableAPICompletenessTests(PythonAPICompletenessTestCase, PyFlinkTestCase):
             'asArgument',
             'process',
             'partitionBy',
-            'fromChangelog',
         }
 
     @classmethod
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
index 172adfe1354..5189f60dfe8 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
@@ -1435,16 +1435,16 @@ public interface Table extends Explainable<Table>, 
Executable {
      *
      * <pre>{@code
      * // Default: adds 'op' column and supports all changelog modes
-     * table.toChangelog();
+     * Table result = table.toChangelog();
      *
      * // Custom op column name and mapping
-     * table.toChangelog(
+     * Table result = table.toChangelog(
      *     descriptor("op_code").asArgument("op"),
      *     map("INSERT", "I", "UPDATE_AFTER", "U").asArgument("op_mapping")
      * );
      *
      * // Deletion flag pattern: comma-separated keys map multiple change 
operations to the same code
-     * table.toChangelog(
+     * Table result = table.toChangelog(
      *     descriptor("deleted").asArgument("op"),
      *     map("INSERT, UPDATE_AFTER", "false", "DELETE", 
"true").asArgument("op_mapping")
      * );
@@ -1456,23 +1456,33 @@ public interface Table extends Explainable<Table>, 
Executable {
     Table toChangelog(Expression... arguments);
 
     /**
-     * Converts this append-only table with an explicit operation code column 
into a dynamic table
-     * using the built-in {@code FROM_CHANGELOG} process table function.
+     * Converts this append-only table with an explicit operation code column 
into a (potentially
+     * updating) dynamic table. Each input row is expected to have a string 
column that indicates
+     * the change operation. The operation column is interpreted by the engine 
and removed from the
+     * output.
      *
-     * <p>Each input row is expected to have a string operation code column 
(default: {@code "op"})
-     * that indicates the change operation (e.g., INSERT, UPDATE_AFTER, 
UPDATE_BEFORE, DELETE). The
-     * output table is a dynamic table backed by a changelog stream.
+     * <p>The operation code column defaults to {@code op}. By default, the 
codes {@code INSERT},
+     * {@code UPDATE_BEFORE}, {@code UPDATE_AFTER}, and {@code DELETE} are 
recognized; pass {@code
+     * op_mapping} to use custom codes.
      *
      * <p>Optional arguments can be passed using named expressions:
      *
      * <pre>{@code
      * // Default: reads 'op' column with standard change operation names
-     * table.fromChangelog();
+     * Table result = cdcStream.fromChangelog();
      *
-     * // Custom op column name and mapping (Debezium-style codes)
-     * table.fromChangelog(
-     *     descriptor("__op").asArgument("op"),
-     *     map("c, r", "INSERT", "u", "UPDATE_AFTER", "d", 
"DELETE").asArgument("op_mapping")
+     * // With custom op column name
+     * Table result = cdcStream.fromChangelog(
+     *     descriptor("operation").asArgument("op")
+     * );
+     *
+     * // With custom op_mapping
+     * Table result = cdcStream.fromChangelog(
+     *     descriptor("op").asArgument("op"),
+     *     map("c, r", "INSERT",
+     *         "ub", "UPDATE_BEFORE",
+     *         "ua", "UPDATE_AFTER",
+     *         "d", "DELETE").asArgument("op_mapping")
      * );
      * }</pre>
      *

Reply via email to