This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-2.3
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-2.3 by this push:
     new 9f19743deb4 [FLINK-39442][python] Add descriptor() and to_changelog() 
to Python Table API
9f19743deb4 is described below

commit 9f19743deb4eb1eb6424150eb7d499c7da3cb17b
Author: Gustavo de Morais <[email protected]>
AuthorDate: Fri Apr 17 17:20:49 2026 +0200

    [FLINK-39442][python] Add descriptor() and to_changelog() to Python Table 
API
    
    This closes #27953.
---
 flink-python/pyflink/table/expressions.py          | 22 ++++++++++++++-
 flink-python/pyflink/table/table.py                | 33 ++++++++++++++++++++++
 .../pyflink/table/tests/test_table_completeness.py |  1 -
 3 files changed, 54 insertions(+), 2 deletions(-)

diff --git a/flink-python/pyflink/table/expressions.py 
b/flink-python/pyflink/table/expressions.py
index daf73ae0c21..7f0fe4956c9 100644
--- a/flink-python/pyflink/table/expressions.py
+++ b/flink-python/pyflink/table/expressions.py
@@ -34,7 +34,7 @@ __all__ = ['if_then_else', 'lit', 'col', 'range_', 'and_', 
'or_', 'not_', 'UNBOU
            'concat_ws', 'uuid', 'null_of', 'log', 'with_columns', 
'without_columns', 'json',
            'json_string', 'json_object', 'json_object_agg', 'json_array', 
'json_array_agg',
            'call', 'call_sql', 'source_watermark', 'to_timestamp_ltz', 
'from_unixtime', 'to_date',
-           'to_timestamp', 'convert_tz', 'unix_timestamp']
+           'to_timestamp', 'convert_tz', 'unix_timestamp', 'descriptor']
 
 
 def _leaf_op(op_name: str) -> Expression:
@@ -578,6 +578,26 @@ def map_from_arrays(key, value) -> Expression:
     return _binary_op("mapFromArrays", key, value)
 
 
+@PublicEvolving()
+def descriptor(*column_names: str) -> Expression:
+    """
+    Creates a literal describing an arbitrary, unvalidated list of column 
names.
+
+    Passing a list of columns can be useful for parameterizing a function. In 
particular,
+    it enables declaring the ``on_time`` argument for process table functions.
+
+    Example:
+    ::
+
+        >>> descriptor("ts_column")
+        >>> descriptor("col1", "col2")
+
+    :param column_names: One or more column names.
+    :return: A descriptor expression.
+    """
+    return _varargs_op("descriptor", *column_names)
+
+
 @PublicEvolving()
 def object_of(class_name: str, *args) -> Expression:
     """
diff --git a/flink-python/pyflink/table/table.py 
b/flink-python/pyflink/table/table.py
index 285dd9d811c..b76918083e5 100644
--- a/flink-python/pyflink/table/table.py
+++ b/flink-python/pyflink/table/table.py
@@ -1187,6 +1187,39 @@ class Table(object):
                 t_env=self._t_env,
             )
 
+    def to_changelog(self, *arguments: Expression) -> 'Table':
+        """
+        Converts this table into an append-only table with an explicit 
operation code
+        column using the built-in ``TO_CHANGELOG`` process table function.
+
+        Each input row - regardless of its original change operation - is 
emitted as an
+        INSERT-only row with a string ``op`` column indicating the original 
operation
+        (INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE).
+
+        Example:
+        ::
+
+            >>> from pyflink.table.expressions import descriptor, map_
+            >>> # Default: adds 'op' column with standard change operation 
names
+            >>> table.to_changelog()
+            >>> # Custom op column name and mapping
+            >>> table.to_changelog(
+            ...     descriptor("op_code").as_argument("op"),
+            ...     map_("INSERT", "I", "UPDATE_AFTER", 
"U").as_argument("op_mapping")
+            ... )
+            >>> # Deletion flag pattern
+            >>> table.to_changelog(
+            ...     descriptor("deleted").as_argument("op"),
+            ...     map_("INSERT, UPDATE_AFTER", "false",
+            ...          "DELETE", "true").as_argument("op_mapping")
+            ... )
+
+        :param arguments: Optional named arguments for ``op`` and 
``op_mapping``.
+        :return: An append-only :class:`~pyflink.table.Table` with an ``op`` 
column prepended
+                 to the input columns.
+        """
+        return 
Table(self._j_table.toChangelog(to_expression_jarray(arguments)), self._t_env)
+
 
 @PublicEvolving()
 class GroupedTable(object):
diff --git a/flink-python/pyflink/table/tests/test_table_completeness.py 
b/flink-python/pyflink/table/tests/test_table_completeness.py
index 21bb038aef3..feca7e63b1a 100644
--- a/flink-python/pyflink/table/tests/test_table_completeness.py
+++ b/flink-python/pyflink/table/tests/test_table_completeness.py
@@ -42,7 +42,6 @@ class 
TableAPICompletenessTests(PythonAPICompletenessTestCase, PyFlinkTestCase):
             'asArgument',
             'process',
             'partitionBy',
-            'toChangelog',
         }
 
     @classmethod

Reply via email to