This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch release-2.3
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.3 by this push:
new 9f19743deb4 [FLINK-39442][python] Add descriptor() and to_changelog()
to Python Table API
9f19743deb4 is described below
commit 9f19743deb4eb1eb6424150eb7d499c7da3cb17b
Author: Gustavo de Morais <[email protected]>
AuthorDate: Fri Apr 17 17:20:49 2026 +0200
[FLINK-39442][python] Add descriptor() and to_changelog() to Python Table
API
This closes #27953.
---
flink-python/pyflink/table/expressions.py | 22 ++++++++++++++-
flink-python/pyflink/table/table.py | 33 ++++++++++++++++++++++
.../pyflink/table/tests/test_table_completeness.py | 1 -
3 files changed, 54 insertions(+), 2 deletions(-)
diff --git a/flink-python/pyflink/table/expressions.py
b/flink-python/pyflink/table/expressions.py
index daf73ae0c21..7f0fe4956c9 100644
--- a/flink-python/pyflink/table/expressions.py
+++ b/flink-python/pyflink/table/expressions.py
@@ -34,7 +34,7 @@ __all__ = ['if_then_else', 'lit', 'col', 'range_', 'and_',
'or_', 'not_', 'UNBOU
'concat_ws', 'uuid', 'null_of', 'log', 'with_columns',
'without_columns', 'json',
'json_string', 'json_object', 'json_object_agg', 'json_array',
'json_array_agg',
'call', 'call_sql', 'source_watermark', 'to_timestamp_ltz',
'from_unixtime', 'to_date',
- 'to_timestamp', 'convert_tz', 'unix_timestamp']
+ 'to_timestamp', 'convert_tz', 'unix_timestamp', 'descriptor']
def _leaf_op(op_name: str) -> Expression:
@@ -578,6 +578,26 @@ def map_from_arrays(key, value) -> Expression:
return _binary_op("mapFromArrays", key, value)
+@PublicEvolving()
+def descriptor(*column_names: str) -> Expression:
+ """
+ Creates a literal describing an arbitrary, unvalidated list of column
names.
+
+ Passing a list of columns can be useful for parameterizing a function. In
particular,
+ it enables declaring the ``on_time`` argument for process table functions.
+
+ Example:
+ ::
+
+ >>> descriptor("ts_column")
+ >>> descriptor("col1", "col2")
+
+ :param column_names: One or more column names.
+ :return: A descriptor expression.
+ """
+ return _varargs_op("descriptor", *column_names)
+
+
@PublicEvolving()
def object_of(class_name: str, *args) -> Expression:
"""
diff --git a/flink-python/pyflink/table/table.py
b/flink-python/pyflink/table/table.py
index 285dd9d811c..b76918083e5 100644
--- a/flink-python/pyflink/table/table.py
+++ b/flink-python/pyflink/table/table.py
@@ -1187,6 +1187,39 @@ class Table(object):
t_env=self._t_env,
)
+ def to_changelog(self, *arguments: Expression) -> 'Table':
+ """
+ Converts this table into an append-only table with an explicit
operation code
+ column using the built-in ``TO_CHANGELOG`` process table function.
+
+ Each input row - regardless of its original change operation - is
emitted as an
+ INSERT-only row with a string ``op`` column indicating the original
operation
+ (INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE).
+
+ Example:
+ ::
+
+ >>> from pyflink.table.expressions import descriptor, map_
+ >>> # Default: adds 'op' column with standard change operation
names
+ >>> table.to_changelog()
+ >>> # Custom op column name and mapping
+ >>> table.to_changelog(
+ ... descriptor("op_code").as_argument("op"),
+ ... map_("INSERT", "I", "UPDATE_AFTER",
"U").as_argument("op_mapping")
+ ... )
+ >>> # Deletion flag pattern
+ >>> table.to_changelog(
+ ... descriptor("deleted").as_argument("op"),
+ ... map_("INSERT, UPDATE_AFTER", "false",
+ ... "DELETE", "true").as_argument("op_mapping")
+ ... )
+
+ :param arguments: Optional named arguments for ``op`` and
``op_mapping``.
+ :return: An append-only :class:`~pyflink.table.Table` with an ``op``
column prepended
+ to the input columns.
+ """
+ return
Table(self._j_table.toChangelog(to_expression_jarray(arguments)), self._t_env)
+
@PublicEvolving()
class GroupedTable(object):
diff --git a/flink-python/pyflink/table/tests/test_table_completeness.py
b/flink-python/pyflink/table/tests/test_table_completeness.py
index 21bb038aef3..feca7e63b1a 100644
--- a/flink-python/pyflink/table/tests/test_table_completeness.py
+++ b/flink-python/pyflink/table/tests/test_table_completeness.py
@@ -42,7 +42,6 @@ class
TableAPICompletenessTests(PythonAPICompletenessTestCase, PyFlinkTestCase):
'asArgument',
'process',
'partitionBy',
- 'toChangelog',
}
@classmethod