This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch release-2.3 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 87662967e79ad89e7524c89b45ca8c96adea0a8f Author: Ramin Gharib <[email protected]> AuthorDate: Tue Apr 21 13:57:18 2026 +0200 [FLINK-39479][python] Add fromChangelog() to Python Table API This closes #27975. (cherry picked from commit 9edb0f5bc439519c3128af5670e4c4c3af1b4910) --- flink-python/pyflink/table/table.py | 42 ++++++++++++++++++++-- .../pyflink/table/tests/test_table_completeness.py | 1 - .../java/org/apache/flink/table/api/Table.java | 36 ++++++++++++------- 3 files changed, 62 insertions(+), 17 deletions(-) diff --git a/flink-python/pyflink/table/table.py b/flink-python/pyflink/table/table.py index b76918083e5..ac54022a91a 100644 --- a/flink-python/pyflink/table/table.py +++ b/flink-python/pyflink/table/table.py @@ -1201,14 +1201,14 @@ class Table(object): >>> from pyflink.table.expressions import descriptor, map_ >>> # Default: adds 'op' column with standard change operation names - >>> table.to_changelog() + >>> result = table.to_changelog() >>> # Custom op column name and mapping - >>> table.to_changelog( + >>> result = table.to_changelog( ... descriptor("op_code").as_argument("op"), ... map_("INSERT", "I", "UPDATE_AFTER", "U").as_argument("op_mapping") ... ) >>> # Deletion flag pattern - >>> table.to_changelog( + >>> result = table.to_changelog( ... descriptor("deleted").as_argument("op"), ... map_("INSERT, UPDATE_AFTER", "false", ... "DELETE", "true").as_argument("op_mapping") @@ -1220,6 +1220,42 @@ class Table(object): """ return Table(self._j_table.toChangelog(to_expression_jarray(arguments)), self._t_env) + def from_changelog(self, *arguments: Expression) -> 'Table': + """ + Converts this append-only table with an explicit operation code column into a + (potentially updating) dynamic table. Each input row is expected to have a string + column that indicates the change operation. The operation column is interpreted by + the engine and removed from the output. + + The operation code column defaults to ``op``. By default, the codes ``INSERT``, + ``UPDATE_BEFORE``, ``UPDATE_AFTER``, and ``DELETE`` are recognized; pass + ``op_mapping`` to use custom codes. + + Example: + :: + + >>> from pyflink.table.expressions import descriptor, map_ + >>> # Default: reads 'op' column with standard change operation names + >>> result = cdc_stream.from_changelog() + >>> # With custom op column name + >>> result = cdc_stream.from_changelog( + ... descriptor("operation").as_argument("op") + ... ) + >>> # With custom op_mapping + >>> result = cdc_stream.from_changelog( + ... descriptor("op").as_argument("op"), + ... map_("c, r", "INSERT", + ... "ub", "UPDATE_BEFORE", + ... "ua", "UPDATE_AFTER", + ... "d", "DELETE").as_argument("op_mapping") + ... ) + + :param arguments: Optional named arguments for ``op`` and ``op_mapping``. + :return: A dynamic :class:`~pyflink.table.Table` with the ``op`` column removed and + proper change operation semantics. + """ + return Table(self._j_table.fromChangelog(to_expression_jarray(arguments)), self._t_env) + @PublicEvolving() class GroupedTable(object): diff --git a/flink-python/pyflink/table/tests/test_table_completeness.py b/flink-python/pyflink/table/tests/test_table_completeness.py index 1f2b920474f..feca7e63b1a 100644 --- a/flink-python/pyflink/table/tests/test_table_completeness.py +++ b/flink-python/pyflink/table/tests/test_table_completeness.py @@ -42,7 +42,6 @@ class TableAPICompletenessTests(PythonAPICompletenessTestCase, PyFlinkTestCase): 'asArgument', 'process', 'partitionBy', - 'fromChangelog', } @classmethod diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java index 172adfe1354..5189f60dfe8 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java @@ -1435,16 +1435,16 @@ public interface Table extends Explainable<Table>, Executable { * * <pre>{@code * // Default: adds 'op' column and supports all changelog modes - * table.toChangelog(); + * Table result = table.toChangelog(); * * // Custom op column name and mapping - * table.toChangelog( + * Table result = table.toChangelog( * descriptor("op_code").asArgument("op"), * map("INSERT", "I", "UPDATE_AFTER", "U").asArgument("op_mapping") * ); * * // Deletion flag pattern: comma-separated keys map multiple change operations to the same code - * table.toChangelog( + * Table result = table.toChangelog( * descriptor("deleted").asArgument("op"), * map("INSERT, UPDATE_AFTER", "false", "DELETE", "true").asArgument("op_mapping") * ); @@ -1456,23 +1456,33 @@ public interface Table extends Explainable<Table>, Executable { Table toChangelog(Expression... arguments); /** - * Converts this append-only table with an explicit operation code column into a dynamic table - * using the built-in {@code FROM_CHANGELOG} process table function. + * Converts this append-only table with an explicit operation code column into a (potentially + * updating) dynamic table. Each input row is expected to have a string column that indicates + * the change operation. The operation column is interpreted by the engine and removed from the + * output. * - * <p>Each input row is expected to have a string operation code column (default: {@code "op"}) - * that indicates the change operation (e.g., INSERT, UPDATE_AFTER, UPDATE_BEFORE, DELETE). The - * output table is a dynamic table backed by a changelog stream. + * <p>The operation code column defaults to {@code op}. By default, the codes {@code INSERT}, + * {@code UPDATE_BEFORE}, {@code UPDATE_AFTER}, and {@code DELETE} are recognized; pass {@code + * op_mapping} to use custom codes. * * <p>Optional arguments can be passed using named expressions: * * <pre>{@code * // Default: reads 'op' column with standard change operation names - * table.fromChangelog(); + * Table result = cdcStream.fromChangelog(); * - * // Custom op column name and mapping (Debezium-style codes) - * table.fromChangelog( - * descriptor("__op").asArgument("op"), - * map("c, r", "INSERT", "u", "UPDATE_AFTER", "d", "DELETE").asArgument("op_mapping") + * // With custom op column name + * Table result = cdcStream.fromChangelog( + * descriptor("operation").asArgument("op") + * ); + * + * // With custom op_mapping + * Table result = cdcStream.fromChangelog( + * descriptor("op").asArgument("op"), + * map("c, r", "INSERT", + * "ub", "UPDATE_BEFORE", + * "ua", "UPDATE_AFTER", + * "d", "DELETE").asArgument("op_mapping") * ); * }</pre> *
