This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 9edb0f5bc43 [FLINK-39479][python] Add fromChangelog() to Python Table
API
9edb0f5bc43 is described below
commit 9edb0f5bc439519c3128af5670e4c4c3af1b4910
Author: Ramin Gharib <[email protected]>
AuthorDate: Tue Apr 21 13:57:18 2026 +0200
[FLINK-39479][python] Add fromChangelog() to Python Table API
This closes #27975.
---
flink-python/pyflink/table/table.py | 42 ++++++++++++++++++++--
.../pyflink/table/tests/test_table_completeness.py | 1 -
.../java/org/apache/flink/table/api/Table.java | 36 ++++++++++++-------
3 files changed, 62 insertions(+), 17 deletions(-)
diff --git a/flink-python/pyflink/table/table.py
b/flink-python/pyflink/table/table.py
index b76918083e5..ac54022a91a 100644
--- a/flink-python/pyflink/table/table.py
+++ b/flink-python/pyflink/table/table.py
@@ -1201,14 +1201,14 @@ class Table(object):
>>> from pyflink.table.expressions import descriptor, map_
>>> # Default: adds 'op' column with standard change operation
names
- >>> table.to_changelog()
+ >>> result = table.to_changelog()
>>> # Custom op column name and mapping
- >>> table.to_changelog(
+ >>> result = table.to_changelog(
... descriptor("op_code").as_argument("op"),
... map_("INSERT", "I", "UPDATE_AFTER",
"U").as_argument("op_mapping")
... )
>>> # Deletion flag pattern
- >>> table.to_changelog(
+ >>> result = table.to_changelog(
... descriptor("deleted").as_argument("op"),
... map_("INSERT, UPDATE_AFTER", "false",
... "DELETE", "true").as_argument("op_mapping")
@@ -1220,6 +1220,42 @@ class Table(object):
"""
return
Table(self._j_table.toChangelog(to_expression_jarray(arguments)), self._t_env)
+ def from_changelog(self, *arguments: Expression) -> 'Table':
+ """
+ Converts this append-only table with an explicit operation code column
into a
+ (potentially updating) dynamic table. Each input row is expected to
have a string
+ column that indicates the change operation. The operation column is
interpreted by
+ the engine and removed from the output.
+
+ The operation code column defaults to ``op``. By default, the codes
``INSERT``,
+ ``UPDATE_BEFORE``, ``UPDATE_AFTER``, and ``DELETE`` are recognized;
pass
+ ``op_mapping`` to use custom codes.
+
+ Example:
+ ::
+
+ >>> from pyflink.table.expressions import descriptor, map_
+ >>> # Default: reads 'op' column with standard change operation
names
+ >>> result = cdc_stream.from_changelog()
+ >>> # With custom op column name
+ >>> result = cdc_stream.from_changelog(
+ ... descriptor("operation").as_argument("op")
+ ... )
+ >>> # With custom op_mapping
+ >>> result = cdc_stream.from_changelog(
+ ... descriptor("op").as_argument("op"),
+ ... map_("c, r", "INSERT",
+ ... "ub", "UPDATE_BEFORE",
+ ... "ua", "UPDATE_AFTER",
+ ... "d", "DELETE").as_argument("op_mapping")
+ ... )
+
+ :param arguments: Optional named arguments for ``op`` and
``op_mapping``.
+ :return: A dynamic :class:`~pyflink.table.Table` with the ``op``
column removed and
+ proper change operation semantics.
+ """
+ return
Table(self._j_table.fromChangelog(to_expression_jarray(arguments)), self._t_env)
+
@PublicEvolving()
class GroupedTable(object):
diff --git a/flink-python/pyflink/table/tests/test_table_completeness.py
b/flink-python/pyflink/table/tests/test_table_completeness.py
index 1f2b920474f..feca7e63b1a 100644
--- a/flink-python/pyflink/table/tests/test_table_completeness.py
+++ b/flink-python/pyflink/table/tests/test_table_completeness.py
@@ -42,7 +42,6 @@ class
TableAPICompletenessTests(PythonAPICompletenessTestCase, PyFlinkTestCase):
'asArgument',
'process',
'partitionBy',
- 'fromChangelog',
}
@classmethod
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
index 172adfe1354..5189f60dfe8 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
@@ -1435,16 +1435,16 @@ public interface Table extends Explainable<Table>,
Executable {
*
* <pre>{@code
* // Default: adds 'op' column and supports all changelog modes
- * table.toChangelog();
+ * Table result = table.toChangelog();
*
* // Custom op column name and mapping
- * table.toChangelog(
+ * Table result = table.toChangelog(
* descriptor("op_code").asArgument("op"),
* map("INSERT", "I", "UPDATE_AFTER", "U").asArgument("op_mapping")
* );
*
* // Deletion flag pattern: comma-separated keys map multiple change
operations to the same code
- * table.toChangelog(
+ * Table result = table.toChangelog(
* descriptor("deleted").asArgument("op"),
* map("INSERT, UPDATE_AFTER", "false", "DELETE",
"true").asArgument("op_mapping")
* );
@@ -1456,23 +1456,33 @@ public interface Table extends Explainable<Table>,
Executable {
Table toChangelog(Expression... arguments);
/**
- * Converts this append-only table with an explicit operation code column
into a dynamic table
- * using the built-in {@code FROM_CHANGELOG} process table function.
+ * Converts this append-only table with an explicit operation code column
into a (potentially
+ * updating) dynamic table. Each input row is expected to have a string
column that indicates
+ * the change operation. The operation column is interpreted by the engine
and removed from the
+ * output.
*
- * <p>Each input row is expected to have a string operation code column
(default: {@code "op"})
- * that indicates the change operation (e.g., INSERT, UPDATE_AFTER,
UPDATE_BEFORE, DELETE). The
- * output table is a dynamic table backed by a changelog stream.
+ * <p>The operation code column defaults to {@code op}. By default, the
codes {@code INSERT},
+ * {@code UPDATE_BEFORE}, {@code UPDATE_AFTER}, and {@code DELETE} are
recognized; pass {@code
+ * op_mapping} to use custom codes.
*
* <p>Optional arguments can be passed using named expressions:
*
* <pre>{@code
* // Default: reads 'op' column with standard change operation names
- * table.fromChangelog();
+ * Table result = cdcStream.fromChangelog();
*
- * // Custom op column name and mapping (Debezium-style codes)
- * table.fromChangelog(
- * descriptor("__op").asArgument("op"),
- * map("c, r", "INSERT", "u", "UPDATE_AFTER", "d",
"DELETE").asArgument("op_mapping")
+ * // With custom op column name
+ * Table result = cdcStream.fromChangelog(
+ * descriptor("operation").asArgument("op")
+ * );
+ *
+ * // With custom op_mapping
+ * Table result = cdcStream.fromChangelog(
+ * descriptor("op").asArgument("op"),
+ * map("c, r", "INSERT",
+ * "ub", "UPDATE_BEFORE",
+ * "ua", "UPDATE_AFTER",
+ * "d", "DELETE").asArgument("op_mapping")
* );
* }</pre>
*