This is an automated email from the ASF dual-hosted git repository.
dbtsai pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.x by this push:
new f6312fbfc35d [SPARK-56650][SDP][CONNECT] Add AutoCDC Spark Connect APIs
f6312fbfc35d is described below
commit f6312fbfc35deb1dd9de578ebf3203d7be42099e
Author: Andreas Neumann <[email protected]>
AuthorDate: Thu May 21 11:03:36 2026 -0700
[SPARK-56650][SDP][CONNECT] Add AutoCDC Spark Connect APIs
### What changes were proposed in this pull request?
Add the spark-connect protos to support AutoCDC flows. This is mainly two
new protos:
`AutoCdcFlowDetails`: similar to the existing `WriteRelationFlowDetails`,
describes an AutoCDC flow.
`SCDType` an enum to specify the type of SCD (slow changing dimension)
storage type.
For now, throw "not implemented" for the new flow type.
### Why are the changes needed?
This new API is needed to specify an AutoCDC flow in a declarative pipeline.
### Does this PR introduce _any_ user-facing change?
It introduces a new Spark Connect API, however, no implementation yet, and
also no language wrappers (e.g. Python), yet.
### How was this patch tested?
No tests, as this API is unimplemented for now.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Sonnet 4.6
Closes #55589 from anew/add-sc-apis.
Authored-by: Andreas Neumann <[email protected]>
Signed-off-by: DB Tsai <[email protected]>
(cherry picked from commit a88c27545035cde45e701581ab3b73467793c003)
Signed-off-by: DB Tsai <[email protected]>
---
python/pyspark/sql/connect/proto/pipelines_pb2.py | 119 ++++++------
python/pyspark/sql/connect/proto/pipelines_pb2.pyi | 203 ++++++++++++++++++++-
.../main/protobuf/spark/connect/pipelines.proto | 42 +++++
.../sql/connect/pipelines/PipelinesHandler.scala | 43 +++--
4 files changed, 331 insertions(+), 76 deletions(-)
diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.py
b/python/pyspark/sql/connect/proto/pipelines_pb2.py
index 4cca17b39cd6..ec8142c1d2aa 100644
--- a/python/pyspark/sql/connect/proto/pipelines_pb2.py
+++ b/python/pyspark/sql/connect/proto/pipelines_pb2.py
@@ -38,12 +38,13 @@ _sym_db = _symbol_database.Default()
from google.protobuf import any_pb2 as google_dot_protobuf_dot_any__pb2
from google.protobuf import timestamp_pb2 as
google_dot_protobuf_dot_timestamp__pb2
from pyspark.sql.connect.proto import common_pb2 as
spark_dot_connect_dot_common__pb2
+from pyspark.sql.connect.proto import expressions_pb2 as
spark_dot_connect_dot_expressions__pb2
from pyspark.sql.connect.proto import relations_pb2 as
spark_dot_connect_dot_relations__pb2
from pyspark.sql.connect.proto import types_pb2 as
spark_dot_connect_dot_types__pb2
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-
b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1aspark/connect/common.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\xa4\'\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01
\x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12R\n\rdefine_output\x18\x02
\x01(\x0b\x32+.spark.connect.PipelineCommand.DefineOut [...]
+
b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\x83/\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01
\x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12R\n\rdefine_output\x18\x02
\x01(\x0b\x32+.s [...]
)
_globals = globals()
@@ -86,64 +87,68 @@ if not _descriptor._USE_C_DESCRIPTORS:
_globals["_PIPELINEANALYSISCONTEXT"].fields_by_name[
"flow_name"
]._serialized_options = b"\030\001"
- _globals["_OUTPUTTYPE"]._serialized_start = 6942
- _globals["_OUTPUTTYPE"]._serialized_end = 7047
- _globals["_PIPELINECOMMAND"]._serialized_start = 195
- _globals["_PIPELINECOMMAND"]._serialized_end = 5223
- _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_start = 1232
- _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_end = 1540
-
_globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_start
= 1441
-
_globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_end =
1499
- _globals["_PIPELINECOMMAND_DROPDATAFLOWGRAPH"]._serialized_start = 1542
- _globals["_PIPELINECOMMAND_DROPDATAFLOWGRAPH"]._serialized_end = 1632
- _globals["_PIPELINECOMMAND_DEFINEOUTPUT"]._serialized_start = 1635
- _globals["_PIPELINECOMMAND_DEFINEOUTPUT"]._serialized_end = 2933
- _globals["_PIPELINECOMMAND_DEFINEOUTPUT_TABLEDETAILS"]._serialized_start =
2171
- _globals["_PIPELINECOMMAND_DEFINEOUTPUT_TABLEDETAILS"]._serialized_end =
2619
+ _globals["_OUTPUTTYPE"]._serialized_start = 7966
+ _globals["_OUTPUTTYPE"]._serialized_end = 8071
+ _globals["_PIPELINECOMMAND"]._serialized_start = 228
+ _globals["_PIPELINECOMMAND"]._serialized_end = 6247
+ _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_start = 1265
+ _globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_end = 1573
+
_globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_start
= 1474
+
_globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_end =
1532
+ _globals["_PIPELINECOMMAND_DROPDATAFLOWGRAPH"]._serialized_start = 1575
+ _globals["_PIPELINECOMMAND_DROPDATAFLOWGRAPH"]._serialized_end = 1665
+ _globals["_PIPELINECOMMAND_DEFINEOUTPUT"]._serialized_start = 1668
+ _globals["_PIPELINECOMMAND_DEFINEOUTPUT"]._serialized_end = 2966
+ _globals["_PIPELINECOMMAND_DEFINEOUTPUT_TABLEDETAILS"]._serialized_start =
2204
+ _globals["_PIPELINECOMMAND_DEFINEOUTPUT_TABLEDETAILS"]._serialized_end =
2652
_globals[
"_PIPELINECOMMAND_DEFINEOUTPUT_TABLEDETAILS_TABLEPROPERTIESENTRY"
- ]._serialized_start = 2532
+ ]._serialized_start = 2565
_globals[
"_PIPELINECOMMAND_DEFINEOUTPUT_TABLEDETAILS_TABLEPROPERTIESENTRY"
- ]._serialized_end = 2598
- _globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS"]._serialized_start =
2622
- _globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS"]._serialized_end =
2831
-
_globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS_OPTIONSENTRY"]._serialized_start
= 2762
-
_globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS_OPTIONSENTRY"]._serialized_end
= 2820
- _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_start = 2936
- _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_end = 3831
- _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_start =
1441
- _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_end = 1499
-
_globals["_PIPELINECOMMAND_DEFINEFLOW_WRITERELATIONFLOWDETAILS"]._serialized_start
= 3555
-
_globals["_PIPELINECOMMAND_DEFINEFLOW_WRITERELATIONFLOWDETAILS"]._serialized_end
= 3652
- _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_start = 3654
- _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_end = 3712
- _globals["_PIPELINECOMMAND_EXECUTEOUTPUTFLOWS"]._serialized_start = 3834
- _globals["_PIPELINECOMMAND_EXECUTEOUTPUTFLOWS"]._serialized_end = 4190
- _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_start = 4193
- _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_end = 4515
- _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_start =
4518
- _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_end = 4717
-
_globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_start
= 4720
-
_globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_end
= 4878
-
_globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_start =
4881
- _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_end
= 5207
- _globals["_PIPELINECOMMANDRESULT"]._serialized_start = 5226
- _globals["_PIPELINECOMMANDRESULT"]._serialized_end = 5978
-
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_start
= 5595
-
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_end =
5693
- _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_start =
5696
- _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_end =
5829
- _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_start =
5832
- _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_end = 5963
- _globals["_PIPELINEEVENTRESULT"]._serialized_start = 5980
- _globals["_PIPELINEEVENTRESULT"]._serialized_end = 6053
- _globals["_PIPELINEEVENT"]._serialized_start = 6055
- _globals["_PIPELINEEVENT"]._serialized_end = 6171
- _globals["_SOURCECODELOCATION"]._serialized_start = 6174
- _globals["_SOURCECODELOCATION"]._serialized_end = 6415
- _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_start = 6418
- _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_end = 6569
- _globals["_PIPELINEANALYSISCONTEXT"]._serialized_start = 6572
- _globals["_PIPELINEANALYSISCONTEXT"]._serialized_end = 6940
+ ]._serialized_end = 2631
+ _globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS"]._serialized_start =
2655
+ _globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS"]._serialized_end =
2864
+
_globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS_OPTIONSENTRY"]._serialized_start
= 2795
+
_globals["_PIPELINECOMMAND_DEFINEOUTPUT_SINKDETAILS_OPTIONSENTRY"]._serialized_end
= 2853
+ _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_start = 2969
+ _globals["_PIPELINECOMMAND_DEFINEFLOW"]._serialized_end = 4855
+ _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_start =
1474
+ _globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_end = 1532
+
_globals["_PIPELINECOMMAND_DEFINEFLOW_WRITERELATIONFLOWDETAILS"]._serialized_start
= 3703
+
_globals["_PIPELINECOMMAND_DEFINEFLOW_WRITERELATIONFLOWDETAILS"]._serialized_end
= 3800
+
_globals["_PIPELINECOMMAND_DEFINEFLOW_AUTOCDCFLOWDETAILS"]._serialized_start =
3803
+ _globals["_PIPELINECOMMAND_DEFINEFLOW_AUTOCDCFLOWDETAILS"]._serialized_end
= 4623
+ _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_start = 4625
+ _globals["_PIPELINECOMMAND_DEFINEFLOW_RESPONSE"]._serialized_end = 4683
+ _globals["_PIPELINECOMMAND_DEFINEFLOW_SCDTYPE"]._serialized_start = 4685
+ _globals["_PIPELINECOMMAND_DEFINEFLOW_SCDTYPE"]._serialized_end = 4736
+ _globals["_PIPELINECOMMAND_EXECUTEOUTPUTFLOWS"]._serialized_start = 4858
+ _globals["_PIPELINECOMMAND_EXECUTEOUTPUTFLOWS"]._serialized_end = 5214
+ _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_start = 5217
+ _globals["_PIPELINECOMMAND_STARTRUN"]._serialized_end = 5539
+ _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_start =
5542
+ _globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_end = 5741
+
_globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_start
= 5744
+
_globals["_PIPELINECOMMAND_GETQUERYFUNCTIONEXECUTIONSIGNALSTREAM"]._serialized_end
= 5902
+
_globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_start =
5905
+ _globals["_PIPELINECOMMAND_DEFINEFLOWQUERYFUNCTIONRESULT"]._serialized_end
= 6231
+ _globals["_PIPELINECOMMANDRESULT"]._serialized_start = 6250
+ _globals["_PIPELINECOMMANDRESULT"]._serialized_end = 7002
+
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_start
= 6619
+
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_end =
6717
+ _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_start =
6720
+ _globals["_PIPELINECOMMANDRESULT_DEFINEOUTPUTRESULT"]._serialized_end =
6853
+ _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_start =
6856
+ _globals["_PIPELINECOMMANDRESULT_DEFINEFLOWRESULT"]._serialized_end = 6987
+ _globals["_PIPELINEEVENTRESULT"]._serialized_start = 7004
+ _globals["_PIPELINEEVENTRESULT"]._serialized_end = 7077
+ _globals["_PIPELINEEVENT"]._serialized_start = 7079
+ _globals["_PIPELINEEVENT"]._serialized_end = 7195
+ _globals["_SOURCECODELOCATION"]._serialized_start = 7198
+ _globals["_SOURCECODELOCATION"]._serialized_end = 7439
+ _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_start = 7442
+ _globals["_PIPELINEQUERYFUNCTIONEXECUTIONSIGNAL"]._serialized_end = 7593
+ _globals["_PIPELINEANALYSISCONTEXT"]._serialized_start = 7596
+ _globals["_PIPELINEANALYSISCONTEXT"]._serialized_end = 7964
# @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi
b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi
index ee628ddd2419..f327a7e92ac8 100644
--- a/python/pyspark/sql/connect/proto/pipelines_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/pipelines_pb2.pyi
@@ -43,6 +43,7 @@ import google.protobuf.internal.enum_type_wrapper
import google.protobuf.message
import google.protobuf.timestamp_pb2
import pyspark.sql.connect.proto.common_pb2
+import pyspark.sql.connect.proto.expressions_pb2
import pyspark.sql.connect.proto.relations_pb2
import pyspark.sql.connect.proto.types_pb2
import sys
@@ -505,6 +506,26 @@ class PipelineCommand(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
+ class _SCDType:
+ ValueType = typing.NewType("ValueType", builtins.int)
+ V: typing_extensions.TypeAlias = ValueType
+
+ class _SCDTypeEnumTypeWrapper(
+ google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[
+ PipelineCommand.DefineFlow._SCDType.ValueType
+ ],
+ builtins.type,
+ ): # noqa: F821
+ DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor
+ SCD_TYPE_UNSPECIFIED:
PipelineCommand.DefineFlow._SCDType.ValueType # 0
+ SCD_TYPE_1: PipelineCommand.DefineFlow._SCDType.ValueType # 1
+
+ class SCDType(_SCDType, metaclass=_SCDTypeEnumTypeWrapper):
+ """SCD Type for Auto CDC target tables."""
+
+ SCD_TYPE_UNSPECIFIED: PipelineCommand.DefineFlow.SCDType.ValueType # 0
+ SCD_TYPE_1: PipelineCommand.DefineFlow.SCDType.ValueType # 1
+
class SqlConfEntry(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
@@ -554,6 +575,172 @@ class PipelineCommand(google.protobuf.message.Message):
self, oneof_group: typing_extensions.Literal["_relation",
b"_relation"]
) -> typing_extensions.Literal["relation"] | None: ...
+ class AutoCdcFlowDetails(google.protobuf.message.Message):
+ """Details for Auto CDC flows."""
+
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ SOURCE_FIELD_NUMBER: builtins.int
+ KEYS_FIELD_NUMBER: builtins.int
+ SEQUENCE_BY_FIELD_NUMBER: builtins.int
+ APPLY_AS_DELETES_FIELD_NUMBER: builtins.int
+ APPLY_AS_TRUNCATES_FIELD_NUMBER: builtins.int
+ COLUMN_LIST_FIELD_NUMBER: builtins.int
+ EXCEPT_COLUMN_LIST_FIELD_NUMBER: builtins.int
+ STORED_AS_SCD_TYPE_FIELD_NUMBER: builtins.int
+ IGNORE_NULL_UPDATES_COLUMN_LIST_FIELD_NUMBER: builtins.int
+ IGNORE_NULL_UPDATES_EXCEPT_COLUMN_LIST_FIELD_NUMBER: builtins.int
+ source: builtins.str
+ """The name of the CDC source to stream from."""
+ @property
+ def keys(
+ self,
+ ) ->
google.protobuf.internal.containers.RepeatedCompositeFieldContainer[
+ pyspark.sql.connect.proto.expressions_pb2.Expression
+ ]:
+ """Column(s) that uniquely identify a row in source and target
data."""
+ @property
+ def sequence_by(self) ->
pyspark.sql.connect.proto.expressions_pb2.Expression:
+ """Expression to order the source data."""
+ @property
+ def apply_as_deletes(self) ->
pyspark.sql.connect.proto.expressions_pb2.Expression:
+ """Delete condition for the merged operation."""
+ @property
+ def apply_as_truncates(self) ->
pyspark.sql.connect.proto.expressions_pb2.Expression:
+ """Truncate condition for the merged operation."""
+ @property
+ def column_list(
+ self,
+ ) ->
google.protobuf.internal.containers.RepeatedCompositeFieldContainer[
+ pyspark.sql.connect.proto.expressions_pb2.Expression
+ ]:
+ """Columns included in the output table."""
+ @property
+ def except_column_list(
+ self,
+ ) ->
google.protobuf.internal.containers.RepeatedCompositeFieldContainer[
+ pyspark.sql.connect.proto.expressions_pb2.Expression
+ ]:
+ """Columns excluded from the output table."""
+ stored_as_scd_type:
global___PipelineCommand.DefineFlow.SCDType.ValueType
+ """SCD Type for target table."""
+ @property
+ def ignore_null_updates_column_list(
+ self,
+ ) ->
google.protobuf.internal.containers.RepeatedCompositeFieldContainer[
+ pyspark.sql.connect.proto.expressions_pb2.Expression
+ ]:
+ """Subset of columns to ignore null in updates."""
+ @property
+ def ignore_null_updates_except_column_list(
+ self,
+ ) ->
google.protobuf.internal.containers.RepeatedCompositeFieldContainer[
+ pyspark.sql.connect.proto.expressions_pb2.Expression
+ ]:
+ """Subset of columns excluded from ignoring null in updates."""
+ def __init__(
+ self,
+ *,
+ source: builtins.str | None = ...,
+ keys:
collections.abc.Iterable[pyspark.sql.connect.proto.expressions_pb2.Expression]
+ | None = ...,
+ sequence_by:
pyspark.sql.connect.proto.expressions_pb2.Expression | None = ...,
+ apply_as_deletes:
pyspark.sql.connect.proto.expressions_pb2.Expression | None = ...,
+ apply_as_truncates:
pyspark.sql.connect.proto.expressions_pb2.Expression
+ | None = ...,
+ column_list: collections.abc.Iterable[
+ pyspark.sql.connect.proto.expressions_pb2.Expression
+ ]
+ | None = ...,
+ except_column_list: collections.abc.Iterable[
+ pyspark.sql.connect.proto.expressions_pb2.Expression
+ ]
+ | None = ...,
+ stored_as_scd_type:
global___PipelineCommand.DefineFlow.SCDType.ValueType = ...,
+ ignore_null_updates_column_list: collections.abc.Iterable[
+ pyspark.sql.connect.proto.expressions_pb2.Expression
+ ]
+ | None = ...,
+ ignore_null_updates_except_column_list:
collections.abc.Iterable[
+ pyspark.sql.connect.proto.expressions_pb2.Expression
+ ]
+ | None = ...,
+ ) -> None: ...
+ def HasField(
+ self,
+ field_name: typing_extensions.Literal[
+ "_apply_as_deletes",
+ b"_apply_as_deletes",
+ "_apply_as_truncates",
+ b"_apply_as_truncates",
+ "_sequence_by",
+ b"_sequence_by",
+ "_source",
+ b"_source",
+ "apply_as_deletes",
+ b"apply_as_deletes",
+ "apply_as_truncates",
+ b"apply_as_truncates",
+ "sequence_by",
+ b"sequence_by",
+ "source",
+ b"source",
+ ],
+ ) -> builtins.bool: ...
+ def ClearField(
+ self,
+ field_name: typing_extensions.Literal[
+ "_apply_as_deletes",
+ b"_apply_as_deletes",
+ "_apply_as_truncates",
+ b"_apply_as_truncates",
+ "_sequence_by",
+ b"_sequence_by",
+ "_source",
+ b"_source",
+ "apply_as_deletes",
+ b"apply_as_deletes",
+ "apply_as_truncates",
+ b"apply_as_truncates",
+ "column_list",
+ b"column_list",
+ "except_column_list",
+ b"except_column_list",
+ "ignore_null_updates_column_list",
+ b"ignore_null_updates_column_list",
+ "ignore_null_updates_except_column_list",
+ b"ignore_null_updates_except_column_list",
+ "keys",
+ b"keys",
+ "sequence_by",
+ b"sequence_by",
+ "source",
+ b"source",
+ "stored_as_scd_type",
+ b"stored_as_scd_type",
+ ],
+ ) -> None: ...
+ @typing.overload
+ def WhichOneof(
+ self,
+ oneof_group: typing_extensions.Literal["_apply_as_deletes",
b"_apply_as_deletes"],
+ ) -> typing_extensions.Literal["apply_as_deletes"] | None: ...
+ @typing.overload
+ def WhichOneof(
+ self,
+ oneof_group: typing_extensions.Literal[
+ "_apply_as_truncates", b"_apply_as_truncates"
+ ],
+ ) -> typing_extensions.Literal["apply_as_truncates"] | None: ...
+ @typing.overload
+ def WhichOneof(
+ self, oneof_group: typing_extensions.Literal["_sequence_by",
b"_sequence_by"]
+ ) -> typing_extensions.Literal["sequence_by"] | None: ...
+ @typing.overload
+ def WhichOneof(
+ self, oneof_group: typing_extensions.Literal["_source",
b"_source"]
+ ) -> typing_extensions.Literal["source"] | None: ...
+
class Response(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
@@ -588,6 +775,7 @@ class PipelineCommand(google.protobuf.message.Message):
CLIENT_ID_FIELD_NUMBER: builtins.int
SOURCE_CODE_LOCATION_FIELD_NUMBER: builtins.int
RELATION_FLOW_DETAILS_FIELD_NUMBER: builtins.int
+ AUTO_CDC_FLOW_DETAILS_FIELD_NUMBER: builtins.int
EXTENSION_FIELD_NUMBER: builtins.int
ONCE_FIELD_NUMBER: builtins.int
dataflow_graph_id: builtins.str
@@ -613,6 +801,10 @@ class PipelineCommand(google.protobuf.message.Message):
self,
) -> global___PipelineCommand.DefineFlow.WriteRelationFlowDetails: ...
@property
+ def auto_cdc_flow_details(
+ self,
+ ) -> global___PipelineCommand.DefineFlow.AutoCdcFlowDetails: ...
+ @property
def extension(self) -> google.protobuf.any_pb2.Any: ...
once: builtins.bool
"""If true, define the flow as a one-time flow, such as for backfill.
@@ -632,6 +824,8 @@ class PipelineCommand(google.protobuf.message.Message):
source_code_location: global___SourceCodeLocation | None = ...,
relation_flow_details:
global___PipelineCommand.DefineFlow.WriteRelationFlowDetails
| None = ...,
+ auto_cdc_flow_details:
global___PipelineCommand.DefineFlow.AutoCdcFlowDetails
+ | None = ...,
extension: google.protobuf.any_pb2.Any | None = ...,
once: builtins.bool | None = ...,
) -> None: ...
@@ -650,6 +844,8 @@ class PipelineCommand(google.protobuf.message.Message):
b"_source_code_location",
"_target_dataset_name",
b"_target_dataset_name",
+ "auto_cdc_flow_details",
+ b"auto_cdc_flow_details",
"client_id",
b"client_id",
"dataflow_graph_id",
@@ -685,6 +881,8 @@ class PipelineCommand(google.protobuf.message.Message):
b"_source_code_location",
"_target_dataset_name",
b"_target_dataset_name",
+ "auto_cdc_flow_details",
+ b"auto_cdc_flow_details",
"client_id",
b"client_id",
"dataflow_graph_id",
@@ -739,7 +937,10 @@ class PipelineCommand(google.protobuf.message.Message):
@typing.overload
def WhichOneof(
self, oneof_group: typing_extensions.Literal["details", b"details"]
- ) -> typing_extensions.Literal["relation_flow_details", "extension"] |
None: ...
+ ) -> (
+ typing_extensions.Literal["relation_flow_details",
"auto_cdc_flow_details", "extension"]
+ | None
+ ): ...
class ExecuteOutputFlows(google.protobuf.message.Message):
"""Request to execute all flows for a single output (dataset or sink)
remotely."""
diff --git a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
index 6438583c9d47..7632ec95b9b5 100644
--- a/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
+++ b/sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto
@@ -22,6 +22,7 @@ package spark.connect;
import "google/protobuf/any.proto";
import "google/protobuf/timestamp.proto";
import "spark/connect/common.proto";
+import "spark/connect/expressions.proto";
import "spark/connect/relations.proto";
import "spark/connect/types.proto";
@@ -144,6 +145,7 @@ message PipelineCommand {
oneof details {
WriteRelationFlowDetails relation_flow_details = 7;
+ AutoCdcFlowDetails auto_cdc_flow_details = 10;
google.protobuf.Any extension = 999;
}
@@ -154,6 +156,46 @@ message PipelineCommand {
optional spark.connect.Relation relation = 1;
}
+ // Details for Auto CDC flows.
+ message AutoCdcFlowDetails {
+ // The name of the CDC source to stream from.
+ optional string source = 1;
+
+ // Column(s) that uniquely identify a row in source and target data.
+ repeated Expression keys = 2;
+
+ // Expression to order the source data.
+ optional Expression sequence_by = 3;
+
+ // Delete condition for the merged operation.
+ optional Expression apply_as_deletes = 6;
+
+ // Truncate condition for the merged operation.
+ optional Expression apply_as_truncates = 7;
+
+ // Columns included in the output table.
+ repeated Expression column_list = 8;
+
+ // Columns excluded from the output table.
+ repeated Expression except_column_list = 9;
+
+ // SCD Type for target table.
+ SCDType stored_as_scd_type = 10;
+
+ // Subset of columns to ignore null in updates.
+ repeated Expression ignore_null_updates_column_list = 14;
+
+ // Subset of columns excluded from ignoring null in updates.
+ repeated Expression ignore_null_updates_except_column_list = 15;
+
+ }
+
+ // SCD Type for Auto CDC target tables.
+ enum SCDType {
+ SCD_TYPE_UNSPECIFIED = 0;
+ SCD_TYPE_1 = 1;
+ }
+
// If true, define the flow as a one-time flow, such as for backfill.
// Set to true changes the flow in two ways:
// - The flow is run one time by default. If the pipeline is ran with a
full refresh,
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
index dd4799765040..ee19b83c5162 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
@@ -367,24 +367,31 @@ private[connect] object PipelinesHandler extends Logging {
}
}
- val relationFlowDetails = flow.getRelationFlowDetails
- graphElementRegistry.registerFlow(
- UnresolvedFlow(
- identifier = flowIdentifier,
- destinationIdentifier = destinationIdentifier,
- func = FlowAnalysis.createFlowFunctionFromLogicalPlan(
- transformRelationFunc(relationFlowDetails.getRelation)),
- sqlConf = flow.getSqlConfMap.asScala.toMap,
- once = false,
- queryContext = QueryContext(Option(defaultCatalog),
Option(defaultDatabase)),
- origin = QueryOrigin(
- filePath = Option.when(flow.getSourceCodeLocation.hasFileName)(
- flow.getSourceCodeLocation.getFileName),
- line = Option.when(flow.getSourceCodeLocation.hasLineNumber)(
- flow.getSourceCodeLocation.getLineNumber),
- objectType = Some(QueryOriginType.Flow.toString),
- objectName = Option(flowIdentifier.unquotedString),
- language = Some(Python()))))
+ flow.getDetailsCase match {
+ case proto.PipelineCommand.DefineFlow.DetailsCase.RELATION_FLOW_DETAILS
=>
+ val relationFlowDetails = flow.getRelationFlowDetails
+ graphElementRegistry.registerFlow(
+ UnresolvedFlow(
+ identifier = flowIdentifier,
+ destinationIdentifier = destinationIdentifier,
+ func = FlowAnalysis.createFlowFunctionFromLogicalPlan(
+ transformRelationFunc(relationFlowDetails.getRelation)),
+ sqlConf = flow.getSqlConfMap.asScala.toMap,
+ once = false,
+ queryContext = QueryContext(Option(defaultCatalog),
Option(defaultDatabase)),
+ origin = QueryOrigin(
+ filePath = Option.when(flow.getSourceCodeLocation.hasFileName)(
+ flow.getSourceCodeLocation.getFileName),
+ line = Option.when(flow.getSourceCodeLocation.hasLineNumber)(
+ flow.getSourceCodeLocation.getLineNumber),
+ objectType = Some(QueryOriginType.Flow.toString),
+ objectName = Option(flowIdentifier.unquotedString),
+ language = Some(Python()))))
+ case proto.PipelineCommand.DefineFlow.DetailsCase.AUTO_CDC_FLOW_DETAILS
=>
+ throw new UnsupportedOperationException("AutoCdcFlowDetails is not yet
implemented.")
+ case other =>
+ throw new UnsupportedOperationException(s"Unsupported DefineFlow
details case: $other")
+ }
flowIdentifier
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]