AnishMahto commented on code in PR #56045:
URL: https://github.com/apache/spark/pull/56045#discussion_r3284708146
##########
python/pyspark/pipelines/tests/test_graph_element_registry.py:
##########
@@ -97,6 +98,83 @@ def flow2():
self.assertEqual(sink_obj.options["key1"], "value1")
assert
sink_obj.source_code_location.filename.endswith("test_graph_element_registry.py")
+ def test_create_auto_cdc_flow(self):
Review Comment:
non-blocking nit: This test can just be collapsed with
`test_create_auto_cdc_flow_with_all_args`
##########
python/pyspark/pipelines/api.py:
##########
@@ -525,3 +527,136 @@ def create_sink(
comment=None,
)
get_active_graph_element_registry().register_output(sink)
+
+
+def create_auto_cdc_flow(
+ target: str,
+ source: str,
+ keys: Union[List[str], List[Column]],
+ sequence_by: Union[str, Column],
+ apply_as_deletes: Optional[Union[str, Column]] = None,
+ apply_as_truncates: Optional[Union[str, Column]] = None,
+ column_list: Optional[Union[List[str], List[Column]]] = None,
+ except_column_list: Optional[Union[List[str], List[Column]]] = None,
+ stored_as_scd_type: Optional[Literal[1, "1"]] = None,
+ name: Optional[str] = None,
+ ignore_null_updates_column_list: Optional[Union[List[str], List[Column]]]
= None,
+ ignore_null_updates_except_column_list: Optional[Union[List[str],
List[Column]]] = None,
Review Comment:
Let's add these API later when ignore null execution support is actually
built.
##########
python/pyspark/pipelines/tests/test_graph_element_registry.py:
##########
@@ -97,6 +98,83 @@ def flow2():
self.assertEqual(sink_obj.options["key1"], "value1")
assert
sink_obj.source_code_location.filename.endswith("test_graph_element_registry.py")
+ def test_create_auto_cdc_flow(self):
+ from pyspark.sql.connect.functions.builtin import col, expr
Review Comment:
non-blocking nit: can just lift imports out of each individual test
##########
python/pyspark/pipelines/api.py:
##########
@@ -525,3 +527,136 @@ def create_sink(
comment=None,
)
get_active_graph_element_registry().register_output(sink)
+
+
+def create_auto_cdc_flow(
+ target: str,
+ source: str,
+ keys: Union[List[str], List[Column]],
+ sequence_by: Union[str, Column],
+ apply_as_deletes: Optional[Union[str, Column]] = None,
+ apply_as_truncates: Optional[Union[str, Column]] = None,
Review Comment:
Just a heads up, there's a good chance we're not going to get
`apply_as_truncates` functionality merged in for the 4.2 cut.
I'll most likely drop this argument when I connect these APIs to the graph
registration context on the spark connect backend, and then add it back for
spark 4.3+.
No action needed on your side, just giving the heads up.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]