AnishMahto commented on code in PR #56069:
URL: https://github.com/apache/spark/pull/56069#discussion_r3293650456


##########
python/pyspark/pipelines/spark_connect_graph_element_registry.py:
##########
@@ -133,6 +133,41 @@ def register_flow(self, flow: Flow) -> None:
         command.pipeline_command.define_flow.CopyFrom(inner_command)
         self._client.execute_command(command)
 
+    def register_auto_cdc_flow(self, flow: AutoCdcFlow) -> None:
+        from pyspark.sql.connect.column import Column as ConnectColumn
+
+        def to_plan(col: Column) -> Any:
+            return cast(ConnectColumn, col).to_plan(self._client)
+
+        def to_plans(cols: Optional[List[Column]]) -> list:
+            return [] if cols is None else [to_plan(c) for c in cols]
+
+        auto_cdc_details = pb2.PipelineCommand.DefineFlow.AutoCdcFlowDetails(
+            source=flow.source,
+            keys=to_plans(flow.keys),
+            sequence_by=to_plan(flow.sequence_by),
+            column_list=to_plans(flow.column_list),
+            except_column_list=to_plans(flow.except_column_list),
+        )
+        if flow.stored_as_scd_type is not None:
+            auto_cdc_details.stored_as_scd_type = 
pb2.PipelineCommand.DefineFlow.SCDType.SCD_TYPE_1
+        if flow.apply_as_deletes is not None:
+            
auto_cdc_details.apply_as_deletes.CopyFrom(to_plan(flow.apply_as_deletes))
+
+        inner_command = pb2.PipelineCommand.DefineFlow(
+            dataflow_graph_id=self._dataflow_graph_id,
+            target_dataset_name=flow.target,
+            auto_cdc_flow_details=auto_cdc_details,
+            sql_conf={},
+            
source_code_location=source_code_location_to_proto(flow.source_code_location),
+        )
+        if flow.name is not None:

Review Comment:
   Good catch, I now apply the default at the API (`create_auto_cdc_flow`) 
layer, consistent with `query_name` in `append_flow` as an example.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to