szehon-ho commented on code in PR #56069:
URL: https://github.com/apache/spark/pull/56069#discussion_r3295048497


##########
python/pyspark/pipelines/api.py:
##########
@@ -525,3 +526,192 @@ def create_sink(
         comment=None,
     )
     get_active_graph_element_registry().register_output(sink)
+
+
+def create_auto_cdc_flow(
+    target: str,
+    source: str,
+    keys: Union[List[str], List[Column]],
+    sequence_by: Union[str, Column],
+    apply_as_deletes: Optional[Union[str, Column]] = None,
+    column_list: Optional[Union[List[str], List[Column]]] = None,
+    except_column_list: Optional[Union[List[str], List[Column]]] = None,
+    stored_as_scd_type: Optional[Literal[1, "1"]] = None,
+    name: Optional[str] = None,
+) -> None:
+    """
+    Create an Auto CDC flow into the target table from the Change Data Capture 
(CDC) source.
+    Target table must have already been created using create_streaming_table 
function. Only one
+    of column_list and except_column_list can be specified.
+
+    Example:
+        create_auto_cdc_flow(
+            target="target",
+            source="source",
+            keys=["key"],
+            sequence_by="sequence_expr",
+            column_list=["key", "value"],
+        )
+
+    Note that for keys, sequence_by, column_list, and except_column_list the 
arguments have to
+    be column identifiers without qualifiers, e.g. they cannot be 
col("sourceTable.keyId").
+
+    :param target: The name of the target table that receives the Auto CDC 
flow.
+    :param source: The name of the CDC source to stream from.
+    :param keys: The column or combination of columns that uniquely identify a 
row in the source \
+        data. This is used to identify which CDC events apply to specific 
records in the target \
+        table. These keys also identify records in the target table, e.g., if 
there exists a record \
+        for given keys and the CDC source has an UPSERT operation for the same 
keys, we will update \
+        the existing record. At least one key must be provided. This should be 
a list of column \
+        identifiers without qualifiers, expressed as either Python strings or 
PySpark Columns.
+    :param sequence_by: An expression that we use to order the source data. 
This can be expressed \
+        as either a SQL expression string or a PySpark Column.
+    :param apply_as_deletes: Delete condition for the merged operation. This 
should be a string of \
+        expression e.g. "operation = 'DELETE'"

Review Comment:
   nit: should align with the other docstrings:  `either a SQL expression 
string or a PySpark Column`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to