This is an automated email from the ASF dual-hosted git repository.

dbtsai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a88c27545035 [SPARK-56650][SDP][CONNECT] Add AutoCDC Spark Connect APIs
a88c27545035 is described below

commit a88c27545035cde45e701581ab3b73467793c003
Author: Andreas Neumann <[email protected]>
AuthorDate: Thu May 21 11:03:36 2026 -0700

    [SPARK-56650][SDP][CONNECT] Add AutoCDC Spark Connect APIs
    
    ### What changes were proposed in this pull request?
    Add the spark-connect protos to support AutoCDC flows. This is mainly two 
new protos:
    
    `AutoCdcFlowDetails`: similar to the existing `WriteRelationFlowDetails`, 
describes an AutoCDC flow.
     `SCDType` an enum to specify the type of SCD (slow changing dimension) 
storage type.
    
    For now, throw "not implemented" for the new flow type.
    
    ### Why are the changes needed?
    This new API is needed to specify an AutoCDC flow in a declarative pipeline.
    
    ### Does this PR introduce _any_ user-facing change?
    It introduces a new Spark Connect API, however, no implementation yet, and 
also no language wrappers (e.g. Python), yet.
    
    ### How was this patch tested?
    No tests, as this API is unimplemented for now.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    Generated-by: Claude Sonnet 4.6
    
    Closes #55589 from anew/add-sc-apis.
    
    Authored-by: Andreas Neumann <[email protected]>
    Signed-off-by: DB Tsai <[email protected]>
---
 python/pyspark/sql/connect/proto/pipelines_pb2.py  | 119 ++++++------
 python/pyspark/sql/connect/proto/pipelines_pb2.pyi | 203 ++++++++++++++++++++-
 .../main/protobuf/spark/connect/pipelines.proto    |  42 +++++
 .../sql/connect/pipelines/PipelinesHandler.scala   |  43 +++--
 4 files changed, 331 insertions(+), 76 deletions(-)

diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.py 
b/python/pyspark/sql/connect/proto/pipelines_pb2.py
index 4cca17b39cd6..ec8142c1d2aa 100644
--- a/python/pyspark/sql/connect/proto/pipelines_pb2.py
+++ b/python/pyspark/sql/connect/proto/pipelines_pb2.py
@@ -38,12 +38,13 @@ _sym_db = _symbol_database.Default()
 from google.protobuf import any_pb2 as google_dot_protobuf_dot_any__pb2
 from google.protobuf import timestamp_pb2 as 
google_dot_protobuf_dot_timestamp__pb2
 from pyspark.sql.connect.proto import common_pb2 as 
spark_dot_connect_dot_common__pb2
+from pyspark.sql.connect.proto import expressions_pb2 as 
spark_dot_connect_dot_expressions__pb2
 from pyspark.sql.connect.proto import relations_pb2 as 
spark_dot_connect_dot_relations__pb2
 from pyspark.sql.connect.proto import types_pb2 as 
spark_dot_connect_dot_types__pb2
 
 
 DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-    
b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1aspark/connect/common.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\xa4\'\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01
 
\x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12R\n\rdefine_output\x18\x02
 \x01(\x0b\x32+.spark.connect.PipelineCommand.DefineOut [...]
+    
b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\x83/\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01
 
\x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12R\n\rdefine_output\x18\x02
 \x01(\x0b\x32+.s [...]
 )
 
 _globals = globals()
@@ -86,64 +87,68 @@ if not _descriptor._USE_C_DESCRIPTORS:
     _globals["_PIPELINEANALYSISCONTEXT"].fields_by_name[
         "flow_name"
     ]._serialized_options = b"\030\001"
-    _globals["_OUTPUTTYPE"]._serialized_start = 6942
-    _globals["_OUTPUTTYPE"]._serialized_end = 7047
-    _globals["_PIPELINECOMMAND"]._serialized_start = 195
-    _globals["_PIPELINECOMMAND"]._serialized_end = 5223
-    _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_start = 1232
-    _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_end = 1540
-    
_globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_start 
= 1441
-    
_globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_end = 
1499
-    _globals["_PIPELINECOMMAND_DROPDATAFLOWGRAPH"]._serialized_start = 1542
-    _globals["_PIPELINECOMMAND_DROPDATAFLOWGRAPH"]._serialized_end = 1632
-    _globals["_PIPELINECOMMAND_DEFINEOUTPUT"]._serialized_start = 1635
-    _globals["_PIPELINECOMMAND_DEFINEOUTPUT"]._serialized_end = 2933
-    _globals["_PIPELINECOMMAND_DEFINEOUTPUT_TABLEDETAILS"]._serialized_start = 
2171
-    _globals["_PIPELINECOMMAND_DEFINEOUTPUT_TABLEDETAILS"]._serialized_end = 
2619
+    _globals["_OUTPUTTYPE"]._serialized_start = 7966
+    _globals["_OUTPUTTYPE"]._serialized_end = 8071
+    _globals["_PIPELINECOMMAND"]._serialized_start = 228
+    _globals["_PIPELINECOMMAND"]._serialized_end = 6247
+    _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_start = 1265
+    _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_end = 1573
+    
_globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_start 
= 1474
+    
_globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_end = 
1532
+    _globals["_PIPELINECOMMAND_DROPDATAFLOWGRAPH"]._serialized_start = 1575
+    _globals["_PIPELINECOMMAND_DROPDATAFLOWGRAPH"]._serialized_end = 1665
+    _globals["_PIPELINECOMMAND_DEFINEOUTPUT"]._serialized_start = 1668
+    _globals["_PIPELINECOMMAND_DEFINEOUTPUT"]._serialized_end = 2966
+    _globals["_PIPELINECOMMAND_DEFINEOUTPUT_TABLEDETAILS"]._serialized_start = 
2204
+    _globals["_PIPELINECOMMAND_DEFINEOUTPUT_TABLEDETAILS"]._serialized_end = 
2652
     _globals[
         "_PIPELINECOMMAND_DEFINEOUTPUT_TABLEDETAILS_TABLEPROPERTIESENTRY"
-    ]._serialized_start = 2532
+    ]._serialized_start = 2565
     _globals[
         "_PIPELINECOMMAND_DEFINEOUTPUT_TABLEDETAILS_TABLEPROPERTIESENTRY"
-    ]._serialized_end = 2598
-    _globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS"]._serialized_start = 
2622
-    _globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS"]._serialized_end = 
2831
-    
_globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS_OPTIONSENTRY"]._serialized_start
 = 2762
-    
_globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS_OPTIONSENTRY"]._serialized_end
 = 2820
-    _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_start = 2936
-    _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_end = 3831
-    _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_start = 
1441
-    _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_end = 1499
-    
_globals["_PIPELINECOMMAND_DEFINEFLOW_WRITERELATIONFLOWDETAILS"]._serialized_start
 = 3555
-    
_globals["_PIPELINECOMMAND_DEFINEFLOW_WRITERELATIONFLOWDETAILS"]._serialized_end
 = 3652
-    _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_start = 3654
-    _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_end = 3712
-    _globals["_PIPELINECOMMAND_EXECUTEOUTPUTFLOWS"]._serialized_start = 3834
-    _globals["_PIPELINECOMMAND_EXECUTEOUTPUTFLOWS"]._serialized_end = 4190
-    _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_start = 4193
-    _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_end = 4515
-    _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_start = 
4518
-    _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_end = 4717
-    
_globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_start
 = 4720
-    
_globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_end
 = 4878
-    
_globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_start = 
4881
-    _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_end 
= 5207
-    _globals["_PIPELINECOMMANDRESULT"]._serialized_start = 5226
-    _globals["_PIPELINECOMMANDRESULT"]._serialized_end = 5978
-    
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_start 
= 5595
-    
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_end = 
5693
-    _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_start = 
5696
-    _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_end = 
5829
-    _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_start = 
5832
-    _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_end = 5963
-    _globals["_PIPELINEEVENTRESULT"]._serialized_start = 5980
-    _globals["_PIPELINEEVENTRESULT"]._serialized_end = 6053
-    _globals["_PIPELINEEVENT"]._serialized_start = 6055
-    _globals["_PIPELINEEVENT"]._serialized_end = 6171
-    _globals["_SOURCECODELOCATION"]._serialized_start = 6174
-    _globals["_SOURCECODELOCATION"]._serialized_end = 6415
-    _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_start = 6418
-    _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_end = 6569
-    _globals["_PIPELINEANALYSISCONTEXT"]._serialized_start = 6572
-    _globals["_PIPELINEANALYSISCONTEXT"]._serialized_end = 6940
+    ]._serialized_end = 2631
+    _globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS"]._serialized_start = 
2655
+    _globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS"]._serialized_end = 
2864
+    
_globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS_OPTIONSENTRY"]._serialized_start
 = 2795
+    
_globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS_OPTIONSENTRY"]._serialized_end
 = 2853
+    _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_start = 2969
+    _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_end = 4855
+    _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_start = 
1474
+    _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_end = 1532
+    
_globals["_PIPELINECOMMAND_DEFINEFLOW_WRITERELATIONFLOWDETAILS"]._serialized_start
 = 3703
+    
_globals["_PIPELINECOMMAND_DEFINEFLOW_WRITERELATIONFLOWDETAILS"]._serialized_end
 = 3800
+    
_globals["_PIPELINECOMMAND_DEFINEFLOW_AUTOCDCFLOWDETAILS"]._serialized_start = 
3803
+    _globals["_PIPELINECOMMAND_DEFINEFLOW_AUTOCDCFLOWDETAILS"]._serialized_end 
= 4623
+    _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_start = 4625
+    _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_end = 4683
+    _globals["_PIPELINECOMMAND_DEFINEFLOW_SCDTYPE"]._serialized_start = 4685
+    _globals["_PIPELINECOMMAND_DEFINEFLOW_SCDTYPE"]._serialized_end = 4736
+    _globals["_PIPELINECOMMAND_EXECUTEOUTPUTFLOWS"]._serialized_start = 4858
+    _globals["_PIPELINECOMMAND_EXECUTEOUTPUTFLOWS"]._serialized_end = 5214
+    _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_start = 5217
+    _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_end = 5539
+    _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_start = 
5542
+    _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_end = 5741
+    
_globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_start
 = 5744
+    
_globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_end
 = 5902
+    
_globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_start = 
5905
+    _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_end 
= 6231
+    _globals["_PIPELINECOMMANDRESULT"]._serialized_start = 6250
+    _globals["_PIPELINECOMMANDRESULT"]._serialized_end = 7002
+    
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_start 
= 6619
+    
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_end = 
6717
+    _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_start = 
6720
+    _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_end = 
6853
+    _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_start = 
6856
+    _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_end = 6987
+    _globals["_PIPELINEEVENTRESULT"]._serialized_start = 7004
+    _globals["_PIPELINEEVENTRESULT"]._serialized_end = 7077
+    _globals["_PIPELINEEVENT"]._serialized_start = 7079
+    _globals["_PIPELINEEVENT"]._serialized_end = 7195
+    _globals["_SOURCECODELOCATION"]._serialized_start = 7198
+    _globals["_SOURCECODELOCATION"]._serialized_end = 7439
+    _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_start = 7442
+    _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_end = 7593
+    _globals["_PIPELINEANALYSISCONTEXT"]._serialized_start = 7596
+    _globals["_PIPELINEANALYSISCONTEXT"]._serialized_end = 7964
 # @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi 
b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi
index ee628ddd2419..f327a7e92ac8 100644
--- a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi
@@ -43,6 +43,7 @@ import google.protobuf.internal.enum_type_wrapper
 import google.protobuf.message
 import google.protobuf.timestamp_pb2
 import pyspark.sql.connect.proto.common_pb2
+import pyspark.sql.connect.proto.expressions_pb2
 import pyspark.sql.connect.proto.relations_pb2
 import pyspark.sql.connect.proto.types_pb2
 import sys
@@ -505,6 +506,26 @@ class PipelineCommand(google.protobuf.message.Message):
 
         DESCRIPTOR: google.protobuf.descriptor.Descriptor
 
+        class _SCDType:
+            ValueType = typing.NewType("ValueType", builtins.int)
+            V: typing_extensions.TypeAlias = ValueType
+
+        class _SCDTypeEnumTypeWrapper(
+            google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[
+                PipelineCommand.DefineFlow._SCDType.ValueType
+            ],
+            builtins.type,
+        ):  # noqa: F821
+            DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor
+            SCD_TYPE_UNSPECIFIED: 
PipelineCommand.DefineFlow._SCDType.ValueType  # 0
+            SCD_TYPE_1: PipelineCommand.DefineFlow._SCDType.ValueType  # 1
+
+        class SCDType(_SCDType, metaclass=_SCDTypeEnumTypeWrapper):
+            """SCD Type for Auto CDC target tables."""
+
+        SCD_TYPE_UNSPECIFIED: PipelineCommand.DefineFlow.SCDType.ValueType  # 0
+        SCD_TYPE_1: PipelineCommand.DefineFlow.SCDType.ValueType  # 1
+
         class SqlConfEntry(google.protobuf.message.Message):
             DESCRIPTOR: google.protobuf.descriptor.Descriptor
 
@@ -554,6 +575,172 @@ class PipelineCommand(google.protobuf.message.Message):
                 self, oneof_group: typing_extensions.Literal["_relation", 
b"_relation"]
             ) -> typing_extensions.Literal["relation"] | None: ...
 
+        class AutoCdcFlowDetails(google.protobuf.message.Message):
+            """Details for Auto CDC flows."""
+
+            DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+            SOURCE_FIELD_NUMBER: builtins.int
+            KEYS_FIELD_NUMBER: builtins.int
+            SEQUENCE_BY_FIELD_NUMBER: builtins.int
+            APPLY_AS_DELETES_FIELD_NUMBER: builtins.int
+            APPLY_AS_TRUNCATES_FIELD_NUMBER: builtins.int
+            COLUMN_LIST_FIELD_NUMBER: builtins.int
+            EXCEPT_COLUMN_LIST_FIELD_NUMBER: builtins.int
+            STORED_AS_SCD_TYPE_FIELD_NUMBER: builtins.int
+            IGNORE_NULL_UPDATES_COLUMN_LIST_FIELD_NUMBER: builtins.int
+            IGNORE_NULL_UPDATES_EXCEPT_COLUMN_LIST_FIELD_NUMBER: builtins.int
+            source: builtins.str
+            """The name of the CDC source to stream from."""
+            @property
+            def keys(
+                self,
+            ) -> 
google.protobuf.internal.containers.RepeatedCompositeFieldContainer[
+                pyspark.sql.connect.proto.expressions_pb2.Expression
+            ]:
+                """Column(s) that uniquely identify a row in source and target 
data."""
+            @property
+            def sequence_by(self) -> 
pyspark.sql.connect.proto.expressions_pb2.Expression:
+                """Expression to order the source data."""
+            @property
+            def apply_as_deletes(self) -> 
pyspark.sql.connect.proto.expressions_pb2.Expression:
+                """Delete condition for the merged operation."""
+            @property
+            def apply_as_truncates(self) -> 
pyspark.sql.connect.proto.expressions_pb2.Expression:
+                """Truncate condition for the merged operation."""
+            @property
+            def column_list(
+                self,
+            ) -> 
google.protobuf.internal.containers.RepeatedCompositeFieldContainer[
+                pyspark.sql.connect.proto.expressions_pb2.Expression
+            ]:
+                """Columns included in the output table."""
+            @property
+            def except_column_list(
+                self,
+            ) -> 
google.protobuf.internal.containers.RepeatedCompositeFieldContainer[
+                pyspark.sql.connect.proto.expressions_pb2.Expression
+            ]:
+                """Columns excluded from the output table."""
+            stored_as_scd_type: 
global___PipelineCommand.DefineFlow.SCDType.ValueType
+            """SCD Type for target table."""
+            @property
+            def ignore_null_updates_column_list(
+                self,
+            ) -> 
google.protobuf.internal.containers.RepeatedCompositeFieldContainer[
+                pyspark.sql.connect.proto.expressions_pb2.Expression
+            ]:
+                """Subset of columns to ignore null in updates."""
+            @property
+            def ignore_null_updates_except_column_list(
+                self,
+            ) -> 
google.protobuf.internal.containers.RepeatedCompositeFieldContainer[
+                pyspark.sql.connect.proto.expressions_pb2.Expression
+            ]:
+                """Subset of columns excluded from ignoring null in updates."""
+            def __init__(
+                self,
+                *,
+                source: builtins.str | None = ...,
+                keys: 
collections.abc.Iterable[pyspark.sql.connect.proto.expressions_pb2.Expression]
+                | None = ...,
+                sequence_by: 
pyspark.sql.connect.proto.expressions_pb2.Expression | None = ...,
+                apply_as_deletes: 
pyspark.sql.connect.proto.expressions_pb2.Expression | None = ...,
+                apply_as_truncates: 
pyspark.sql.connect.proto.expressions_pb2.Expression
+                | None = ...,
+                column_list: collections.abc.Iterable[
+                    pyspark.sql.connect.proto.expressions_pb2.Expression
+                ]
+                | None = ...,
+                except_column_list: collections.abc.Iterable[
+                    pyspark.sql.connect.proto.expressions_pb2.Expression
+                ]
+                | None = ...,
+                stored_as_scd_type: 
global___PipelineCommand.DefineFlow.SCDType.ValueType = ...,
+                ignore_null_updates_column_list: collections.abc.Iterable[
+                    pyspark.sql.connect.proto.expressions_pb2.Expression
+                ]
+                | None = ...,
+                ignore_null_updates_except_column_list: 
collections.abc.Iterable[
+                    pyspark.sql.connect.proto.expressions_pb2.Expression
+                ]
+                | None = ...,
+            ) -> None: ...
+            def HasField(
+                self,
+                field_name: typing_extensions.Literal[
+                    "_apply_as_deletes",
+                    b"_apply_as_deletes",
+                    "_apply_as_truncates",
+                    b"_apply_as_truncates",
+                    "_sequence_by",
+                    b"_sequence_by",
+                    "_source",
+                    b"_source",
+                    "apply_as_deletes",
+                    b"apply_as_deletes",
+                    "apply_as_truncates",
+                    b"apply_as_truncates",
+                    "sequence_by",
+                    b"sequence_by",
+                    "source",
+                    b"source",
+                ],
+            ) -> builtins.bool: ...
+            def ClearField(
+                self,
+                field_name: typing_extensions.Literal[
+                    "_apply_as_deletes",
+                    b"_apply_as_deletes",
+                    "_apply_as_truncates",
+                    b"_apply_as_truncates",
+                    "_sequence_by",
+                    b"_sequence_by",
+                    "_source",
+                    b"_source",
+                    "apply_as_deletes",
+                    b"apply_as_deletes",
+                    "apply_as_truncates",
+                    b"apply_as_truncates",
+                    "column_list",
+                    b"column_list",
+                    "except_column_list",
+                    b"except_column_list",
+                    "ignore_null_updates_column_list",
+                    b"ignore_null_updates_column_list",
+                    "ignore_null_updates_except_column_list",
+                    b"ignore_null_updates_except_column_list",
+                    "keys",
+                    b"keys",
+                    "sequence_by",
+                    b"sequence_by",
+                    "source",
+                    b"source",
+                    "stored_as_scd_type",
+                    b"stored_as_scd_type",
+                ],
+            ) -> None: ...
+            @typing.overload
+            def WhichOneof(
+                self,
+                oneof_group: typing_extensions.Literal["_apply_as_deletes", 
b"_apply_as_deletes"],
+            ) -> typing_extensions.Literal["apply_as_deletes"] | None: ...
+            @typing.overload
+            def WhichOneof(
+                self,
+                oneof_group: typing_extensions.Literal[
+                    "_apply_as_truncates", b"_apply_as_truncates"
+                ],
+            ) -> typing_extensions.Literal["apply_as_truncates"] | None: ...
+            @typing.overload
+            def WhichOneof(
+                self, oneof_group: typing_extensions.Literal["_sequence_by", 
b"_sequence_by"]
+            ) -> typing_extensions.Literal["sequence_by"] | None: ...
+            @typing.overload
+            def WhichOneof(
+                self, oneof_group: typing_extensions.Literal["_source", 
b"_source"]
+            ) -> typing_extensions.Literal["source"] | None: ...
+
         class Response(google.protobuf.message.Message):
             DESCRIPTOR: google.protobuf.descriptor.Descriptor
 
@@ -588,6 +775,7 @@ class PipelineCommand(google.protobuf.message.Message):
         CLIENT_ID_FIELD_NUMBER: builtins.int
         SOURCE_CODE_LOCATION_FIELD_NUMBER: builtins.int
         RELATION_FLOW_DETAILS_FIELD_NUMBER: builtins.int
+        AUTO_CDC_FLOW_DETAILS_FIELD_NUMBER: builtins.int
         EXTENSION_FIELD_NUMBER: builtins.int
         ONCE_FIELD_NUMBER: builtins.int
         dataflow_graph_id: builtins.str
@@ -613,6 +801,10 @@ class PipelineCommand(google.protobuf.message.Message):
             self,
         ) -> global___PipelineCommand.DefineFlow.WriteRelationFlowDetails: ...
         @property
+        def auto_cdc_flow_details(
+            self,
+        ) -> global___PipelineCommand.DefineFlow.AutoCdcFlowDetails: ...
+        @property
         def extension(self) -> google.protobuf.any_pb2.Any: ...
         once: builtins.bool
         """If true, define the flow as a one-time flow, such as for backfill.
@@ -632,6 +824,8 @@ class PipelineCommand(google.protobuf.message.Message):
             source_code_location: global___SourceCodeLocation | None = ...,
             relation_flow_details: 
global___PipelineCommand.DefineFlow.WriteRelationFlowDetails
             | None = ...,
+            auto_cdc_flow_details: 
global___PipelineCommand.DefineFlow.AutoCdcFlowDetails
+            | None = ...,
             extension: google.protobuf.any_pb2.Any | None = ...,
             once: builtins.bool | None = ...,
         ) -> None: ...
@@ -650,6 +844,8 @@ class PipelineCommand(google.protobuf.message.Message):
                 b"_source_code_location",
                 "_target_dataset_name",
                 b"_target_dataset_name",
+                "auto_cdc_flow_details",
+                b"auto_cdc_flow_details",
                 "client_id",
                 b"client_id",
                 "dataflow_graph_id",
@@ -685,6 +881,8 @@ class PipelineCommand(google.protobuf.message.Message):
                 b"_source_code_location",
                 "_target_dataset_name",
                 b"_target_dataset_name",
+                "auto_cdc_flow_details",
+                b"auto_cdc_flow_details",
                 "client_id",
                 b"client_id",
                 "dataflow_graph_id",
@@ -739,7 +937,10 @@ class PipelineCommand(google.protobuf.message.Message):
         @typing.overload
         def WhichOneof(
             self, oneof_group: typing_extensions.Literal["details", b"details"]
-        ) -> typing_extensions.Literal["relation_flow_details", "extension"] | 
None: ...
+        ) -> (
+            typing_extensions.Literal["relation_flow_details", 
"auto_cdc_flow_details", "extension"]
+            | None
+        ): ...
 
     class ExecuteOutputFlows(google.protobuf.message.Message):
         """Request to execute all flows for a single output (dataset or sink) 
remotely."""
diff --git a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto 
b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
index 6438583c9d47..7632ec95b9b5 100644
--- a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
+++ b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
@@ -22,6 +22,7 @@ package spark.connect;
 import "google/protobuf/any.proto";
 import "google/protobuf/timestamp.proto";
 import "spark/connect/common.proto";
+import "spark/connect/expressions.proto";
 import "spark/connect/relations.proto";
 import "spark/connect/types.proto";
 
@@ -144,6 +145,7 @@ message PipelineCommand {
 
     oneof details {
       WriteRelationFlowDetails relation_flow_details = 7;
+      AutoCdcFlowDetails auto_cdc_flow_details = 10;
       google.protobuf.Any extension = 999;
     }
 
@@ -154,6 +156,46 @@ message PipelineCommand {
       optional spark.connect.Relation relation = 1;
     }
 
+    // Details for Auto CDC flows.
+    message AutoCdcFlowDetails {
+      // The name of the CDC source to stream from.
+      optional string source = 1;
+
+      // Column(s) that uniquely identify a row in source and target data.
+      repeated Expression keys = 2;
+
+      // Expression to order the source data.
+      optional Expression sequence_by = 3;
+
+      // Delete condition for the merged operation.
+      optional Expression apply_as_deletes = 6;
+
+      // Truncate condition for the merged operation.
+      optional Expression apply_as_truncates = 7;
+
+      // Columns included in the output table.
+      repeated Expression column_list = 8;
+
+      // Columns excluded from the output table.
+      repeated Expression except_column_list = 9;
+
+      // SCD Type for target table.
+      SCDType stored_as_scd_type = 10;
+
+      // Subset of columns to ignore null in updates.
+      repeated Expression ignore_null_updates_column_list = 14;
+
+      // Subset of columns excluded from ignoring null in updates.
+      repeated Expression ignore_null_updates_except_column_list = 15;
+
+    }
+
+    // SCD Type for Auto CDC target tables.
+    enum SCDType {
+      SCD_TYPE_UNSPECIFIED = 0;
+      SCD_TYPE_1 = 1;
+    }
+
     // If true, define the flow as a one-time flow, such as for backfill.
     // Set to true changes the flow in two ways:
     //   - The flow is run one time by default. If the pipeline is ran with a 
full refresh,
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
index dd4799765040..ee19b83c5162 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
@@ -367,24 +367,31 @@ private[connect] object PipelinesHandler extends Logging {
         }
       }
 
-    val relationFlowDetails = flow.getRelationFlowDetails
-    graphElementRegistry.registerFlow(
-      UnresolvedFlow(
-        identifier = flowIdentifier,
-        destinationIdentifier = destinationIdentifier,
-        func = FlowAnalysis.createFlowFunctionFromLogicalPlan(
-          transformRelationFunc(relationFlowDetails.getRelation)),
-        sqlConf = flow.getSqlConfMap.asScala.toMap,
-        once = false,
-        queryContext = QueryContext(Option(defaultCatalog), 
Option(defaultDatabase)),
-        origin = QueryOrigin(
-          filePath = Option.when(flow.getSourceCodeLocation.hasFileName)(
-            flow.getSourceCodeLocation.getFileName),
-          line = Option.when(flow.getSourceCodeLocation.hasLineNumber)(
-            flow.getSourceCodeLocation.getLineNumber),
-          objectType = Some(QueryOriginType.Flow.toString),
-          objectName = Option(flowIdentifier.unquotedString),
-          language = Some(Python()))))
+    flow.getDetailsCase match {
+      case proto.PipelineCommand.DefineFlow.DetailsCase.RELATION_FLOW_DETAILS 
=>
+        val relationFlowDetails = flow.getRelationFlowDetails
+        graphElementRegistry.registerFlow(
+          UnresolvedFlow(
+            identifier = flowIdentifier,
+            destinationIdentifier = destinationIdentifier,
+            func = FlowAnalysis.createFlowFunctionFromLogicalPlan(
+              transformRelationFunc(relationFlowDetails.getRelation)),
+            sqlConf = flow.getSqlConfMap.asScala.toMap,
+            once = false,
+            queryContext = QueryContext(Option(defaultCatalog), 
Option(defaultDatabase)),
+            origin = QueryOrigin(
+              filePath = Option.when(flow.getSourceCodeLocation.hasFileName)(
+                flow.getSourceCodeLocation.getFileName),
+              line = Option.when(flow.getSourceCodeLocation.hasLineNumber)(
+                flow.getSourceCodeLocation.getLineNumber),
+              objectType = Some(QueryOriginType.Flow.toString),
+              objectName = Option(flowIdentifier.unquotedString),
+              language = Some(Python()))))
+      case proto.PipelineCommand.DefineFlow.DetailsCase.AUTO_CDC_FLOW_DETAILS 
=>
+        throw new UnsupportedOperationException("AutoCdcFlowDetails is not yet 
implemented.")
+      case other =>
+        throw new UnsupportedOperationException(s"Unsupported DefineFlow 
details case: $other")
+    }
     flowIdentifier
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to