This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a263a5e4ce87 [SPARK-55280][CONNECT] Add GetStatus proto to support
execution status monitoring
a263a5e4ce87 is described below
commit a263a5e4ce873f873b6332ab80c4f1829eddef0c
Author: Anastasiia Terenteva <[email protected]>
AuthorDate: Tue Feb 3 17:33:38 2026 -0400
[SPARK-55280][CONNECT] Add GetStatus proto to support execution status
monitoring
### What changes were proposed in this pull request?
Add GetStatus proto to support execution status monitoring
Initially, this rpc will provide operations statuses. It's designed to be
extensible in two ways:
- Requesting different status types (operation-level, session-level, some
specific stats).
- Requesting different information in each type.
### Why are the changes needed?
This API will allow users to monitor status (e.g.
RUNNING/TERMINATING/TERMINATED) of the operations in their session.
Particularly useful for multiple-thread sessions, where one "monitoring" thread
has to know operation statuses from the whole session.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Only proto so far. Handler and tests will be added separately.
### Was this patch authored or co-authored using generative AI tooling?
Yes.
Generated-by: Claude 4.5 Opus
Closes #54062 from terana/connect-get-status.
Authored-by: Anastasiia Terenteva <[email protected]>
Signed-off-by: Herman van Hövell <[email protected]>
---
python/pyspark/sql/connect/proto/base_pb2.py | 20 +-
python/pyspark/sql/connect/proto/base_pb2.pyi | 267 +++++++++++++++++++++
python/pyspark/sql/connect/proto/base_pb2_grpc.py | 47 ++++
.../src/main/protobuf/spark/connect/base.proto | 88 ++++++-
4 files changed, 416 insertions(+), 6 deletions(-)
diff --git a/python/pyspark/sql/connect/proto/base_pb2.py
b/python/pyspark/sql/connect/proto/base_pb2.py
index 08950134babb..6119d8dc5539 100644
--- a/python/pyspark/sql/connect/proto/base_pb2.py
+++ b/python/pyspark/sql/connect/proto/base_pb2.py
@@ -45,7 +45,7 @@ from pyspark.sql.connect.proto import pipelines_pb2 as
spark_dot_connect_dot_pip
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-
b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto\x1a\x16spark/connect/ml.proto\x1a\x1dspark/connect/pipelines.proto"\xe3\x03\n\x04Plan\x12-\n\x04root\x18\x01
\x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02
\x01(\x0b\x32\x16.spark.conn [...]
+
b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto\x1a\x16spark/connect/ml.proto\x1a\x1dspark/connect/pipelines.proto"\xe3\x03\n\x04Plan\x12-\n\x04root\x18\x01
\x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02
\x01(\x0b\x32\x16.spark.conn [...]
)
_globals = globals()
@@ -70,8 +70,8 @@ if not _descriptor._USE_C_DESCRIPTORS:
_globals[
"_FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE_MESSAGEPARAMETERSENTRY"
]._serialized_options = b"8\001"
- _globals["_COMPRESSIONCODEC"]._serialized_start = 18571
- _globals["_COMPRESSIONCODEC"]._serialized_end = 18652
+ _globals["_COMPRESSIONCODEC"]._serialized_start = 19857
+ _globals["_COMPRESSIONCODEC"]._serialized_end = 19938
_globals["_PLAN"]._serialized_start = 275
_globals["_PLAN"]._serialized_end = 758
_globals["_PLAN_COMPRESSEDOPERATION"]._serialized_start = 477
@@ -270,6 +270,16 @@ if not _descriptor._USE_C_DESCRIPTORS:
_globals["_CLONESESSIONREQUEST"]._serialized_end = 18362
_globals["_CLONESESSIONRESPONSE"]._serialized_start = 18365
_globals["_CLONESESSIONRESPONSE"]._serialized_end = 18569
- _globals["_SPARKCONNECTSERVICE"]._serialized_start = 18655
- _globals["_SPARKCONNECTSERVICE"]._serialized_end = 19692
+ _globals["_GETSTATUSREQUEST"]._serialized_start = 18572
+ _globals["_GETSTATUSREQUEST"]._serialized_end = 19167
+ _globals["_GETSTATUSREQUEST_OPERATIONSTATUSREQUEST"]._serialized_start =
18971
+ _globals["_GETSTATUSREQUEST_OPERATIONSTATUSREQUEST"]._serialized_end =
19087
+ _globals["_GETSTATUSRESPONSE"]._serialized_start = 19170
+ _globals["_GETSTATUSRESPONSE"]._serialized_end = 19855
+ _globals["_GETSTATUSRESPONSE_OPERATIONSTATUS"]._serialized_start = 19428
+ _globals["_GETSTATUSRESPONSE_OPERATIONSTATUS"]._serialized_end = 19855
+
_globals["_GETSTATUSRESPONSE_OPERATIONSTATUS_OPERATIONSTATE"]._serialized_start
= 19625
+
_globals["_GETSTATUSRESPONSE_OPERATIONSTATUS_OPERATIONSTATE"]._serialized_end =
19855
+ _globals["_SPARKCONNECTSERVICE"]._serialized_start = 19941
+ _globals["_SPARKCONNECTSERVICE"]._serialized_end = 21060
# @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/base_pb2.pyi
b/python/pyspark/sql/connect/proto/base_pb2.pyi
index f12c21e5536d..00e98535047c 100644
--- a/python/pyspark/sql/connect/proto/base_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/base_pb2.pyi
@@ -4311,3 +4311,270 @@ class
CloneSessionResponse(google.protobuf.message.Message):
) -> None: ...
global___CloneSessionResponse = CloneSessionResponse
+
+class GetStatusRequest(google.protobuf.message.Message):
+ """Next ID: 6"""
+
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ class OperationStatusRequest(google.protobuf.message.Message):
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ OPERATION_IDS_FIELD_NUMBER: builtins.int
+ EXTENSIONS_FIELD_NUMBER: builtins.int
+ @property
+ def operation_ids(
+ self,
+ ) ->
google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]:
+ """Get status of operations with these operation_ids.
+ If unset or empty, returns status of all operations in the session.
+ """
+ @property
+ def extensions(
+ self,
+ ) ->
google.protobuf.internal.containers.RepeatedCompositeFieldContainer[
+ google.protobuf.any_pb2.Any
+ ]:
+ """Extension point for custom operation-level status requests."""
+ def __init__(
+ self,
+ *,
+ operation_ids: collections.abc.Iterable[builtins.str] | None = ...,
+ extensions: collections.abc.Iterable[google.protobuf.any_pb2.Any]
| None = ...,
+ ) -> None: ...
+ def ClearField(
+ self,
+ field_name: typing_extensions.Literal[
+ "extensions", b"extensions", "operation_ids", b"operation_ids"
+ ],
+ ) -> None: ...
+
+ SESSION_ID_FIELD_NUMBER: builtins.int
+ USER_CONTEXT_FIELD_NUMBER: builtins.int
+ CLIENT_TYPE_FIELD_NUMBER: builtins.int
+ CLIENT_OBSERVED_SERVER_SIDE_SESSION_ID_FIELD_NUMBER: builtins.int
+ OPERATION_STATUS_FIELD_NUMBER: builtins.int
+ EXTENSIONS_FIELD_NUMBER: builtins.int
+ session_id: builtins.str
+ """(Required)
+
+ The session_id specifies a Spark session for a user identified by
user_context.user_id.
+ The id should be an UUID string of the format
`00112233-4455-6677-8899-aabbccddeeff`
+ """
+ @property
+ def user_context(self) -> global___UserContext:
+ """(Required)
+
+ user_context.user_id and session_id both identify a unique remote
spark session on the
+ server side.
+ """
+ client_type: builtins.str
+ """(Optional)
+
+ Provides optional information about the client sending the request. This
field
+ can be used for language or version specific information and is only
intended for
+ logging purposes and will not be interpreted by the server.
+ """
+ client_observed_server_side_session_id: builtins.str
+ """(Optional)
+
+ Server-side generated idempotency key from the previous responses (if
any). Server
+ can use this to validate that the server side session has not changed.
+ """
+ @property
+ def operation_status(self) ->
global___GetStatusRequest.OperationStatusRequest:
+ """(Optional)
+
+ Get status of operations in the session.
+ """
+ @property
+ def extensions(
+ self,
+ ) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[
+ google.protobuf.any_pb2.Any
+ ]:
+ """Extension point for custom status request types."""
+ def __init__(
+ self,
+ *,
+ session_id: builtins.str = ...,
+ user_context: global___UserContext | None = ...,
+ client_type: builtins.str | None = ...,
+ client_observed_server_side_session_id: builtins.str | None = ...,
+ operation_status: global___GetStatusRequest.OperationStatusRequest |
None = ...,
+ extensions: collections.abc.Iterable[google.protobuf.any_pb2.Any] |
None = ...,
+ ) -> None: ...
+ def HasField(
+ self,
+ field_name: typing_extensions.Literal[
+ "_client_observed_server_side_session_id",
+ b"_client_observed_server_side_session_id",
+ "_client_type",
+ b"_client_type",
+ "_operation_status",
+ b"_operation_status",
+ "client_observed_server_side_session_id",
+ b"client_observed_server_side_session_id",
+ "client_type",
+ b"client_type",
+ "operation_status",
+ b"operation_status",
+ "user_context",
+ b"user_context",
+ ],
+ ) -> builtins.bool: ...
+ def ClearField(
+ self,
+ field_name: typing_extensions.Literal[
+ "_client_observed_server_side_session_id",
+ b"_client_observed_server_side_session_id",
+ "_client_type",
+ b"_client_type",
+ "_operation_status",
+ b"_operation_status",
+ "client_observed_server_side_session_id",
+ b"client_observed_server_side_session_id",
+ "client_type",
+ b"client_type",
+ "extensions",
+ b"extensions",
+ "operation_status",
+ b"operation_status",
+ "session_id",
+ b"session_id",
+ "user_context",
+ b"user_context",
+ ],
+ ) -> None: ...
+ @typing.overload
+ def WhichOneof(
+ self,
+ oneof_group: typing_extensions.Literal[
+ "_client_observed_server_side_session_id",
b"_client_observed_server_side_session_id"
+ ],
+ ) -> typing_extensions.Literal["client_observed_server_side_session_id"] |
None: ...
+ @typing.overload
+ def WhichOneof(
+ self, oneof_group: typing_extensions.Literal["_client_type",
b"_client_type"]
+ ) -> typing_extensions.Literal["client_type"] | None: ...
+ @typing.overload
+ def WhichOneof(
+ self, oneof_group: typing_extensions.Literal["_operation_status",
b"_operation_status"]
+ ) -> typing_extensions.Literal["operation_status"] | None: ...
+
+global___GetStatusRequest = GetStatusRequest
+
+class GetStatusResponse(google.protobuf.message.Message):
+ """Next ID: 4"""
+
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ class OperationStatus(google.protobuf.message.Message):
+ """Status information for a single operation."""
+
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ class _OperationState:
+ ValueType = typing.NewType("ValueType", builtins.int)
+ V: typing_extensions.TypeAlias = ValueType
+
+ class _OperationStateEnumTypeWrapper(
+ google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[
+ GetStatusResponse.OperationStatus._OperationState.ValueType
+ ],
+ builtins.type,
+ ): # noqa: F821
+ DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor
+ OPERATION_STATE_UNSPECIFIED:
GetStatusResponse.OperationStatus._OperationState.ValueType # 0
+ OPERATION_STATE_UNKNOWN:
GetStatusResponse.OperationStatus._OperationState.ValueType # 1
+ OPERATION_STATE_RUNNING:
GetStatusResponse.OperationStatus._OperationState.ValueType # 2
+ OPERATION_STATE_TERMINATING:
GetStatusResponse.OperationStatus._OperationState.ValueType # 3
+ OPERATION_STATE_SUCCEEDED:
GetStatusResponse.OperationStatus._OperationState.ValueType # 4
+ OPERATION_STATE_FAILED:
GetStatusResponse.OperationStatus._OperationState.ValueType # 5
+ OPERATION_STATE_CANCELLED:
GetStatusResponse.OperationStatus._OperationState.ValueType # 6
+
+ class OperationState(_OperationState,
metaclass=_OperationStateEnumTypeWrapper): ...
+ OPERATION_STATE_UNSPECIFIED:
GetStatusResponse.OperationStatus.OperationState.ValueType # 0
+ OPERATION_STATE_UNKNOWN:
GetStatusResponse.OperationStatus.OperationState.ValueType # 1
+ OPERATION_STATE_RUNNING:
GetStatusResponse.OperationStatus.OperationState.ValueType # 2
+ OPERATION_STATE_TERMINATING:
GetStatusResponse.OperationStatus.OperationState.ValueType # 3
+ OPERATION_STATE_SUCCEEDED:
GetStatusResponse.OperationStatus.OperationState.ValueType # 4
+ OPERATION_STATE_FAILED:
GetStatusResponse.OperationStatus.OperationState.ValueType # 5
+ OPERATION_STATE_CANCELLED:
GetStatusResponse.OperationStatus.OperationState.ValueType # 6
+
+ OPERATION_ID_FIELD_NUMBER: builtins.int
+ STATE_FIELD_NUMBER: builtins.int
+ EXTENSIONS_FIELD_NUMBER: builtins.int
+ operation_id: builtins.str
+ """The operation_id of the operation."""
+ state:
global___GetStatusResponse.OperationStatus.OperationState.ValueType
+ """The current status of the operation."""
+ @property
+ def extensions(
+ self,
+ ) ->
google.protobuf.internal.containers.RepeatedCompositeFieldContainer[
+ google.protobuf.any_pb2.Any
+ ]:
+ """Extension point for custom operation-level status fields."""
+ def __init__(
+ self,
+ *,
+ operation_id: builtins.str = ...,
+ state:
global___GetStatusResponse.OperationStatus.OperationState.ValueType = ...,
+ extensions: collections.abc.Iterable[google.protobuf.any_pb2.Any]
| None = ...,
+ ) -> None: ...
+ def ClearField(
+ self,
+ field_name: typing_extensions.Literal[
+ "extensions", b"extensions", "operation_id", b"operation_id",
"state", b"state"
+ ],
+ ) -> None: ...
+
+ SESSION_ID_FIELD_NUMBER: builtins.int
+ SERVER_SIDE_SESSION_ID_FIELD_NUMBER: builtins.int
+ OPERATION_STATUSES_FIELD_NUMBER: builtins.int
+ EXTENSIONS_FIELD_NUMBER: builtins.int
+ session_id: builtins.str
+ """Session id of the session for which the status was requested."""
+ server_side_session_id: builtins.str
+ """Server-side generated idempotency key that the client can use to assert
that the server side
+ session has not changed.
+ """
+ @property
+ def operation_statuses(
+ self,
+ ) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[
+ global___GetStatusResponse.OperationStatus
+ ]:
+ """Status information about requested operations."""
+ @property
+ def extensions(
+ self,
+ ) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[
+ google.protobuf.any_pb2.Any
+ ]:
+ """Extension point for custom status response types."""
+ def __init__(
+ self,
+ *,
+ session_id: builtins.str = ...,
+ server_side_session_id: builtins.str = ...,
+ operation_statuses:
collections.abc.Iterable[global___GetStatusResponse.OperationStatus]
+ | None = ...,
+ extensions: collections.abc.Iterable[google.protobuf.any_pb2.Any] |
None = ...,
+ ) -> None: ...
+ def ClearField(
+ self,
+ field_name: typing_extensions.Literal[
+ "extensions",
+ b"extensions",
+ "operation_statuses",
+ b"operation_statuses",
+ "server_side_session_id",
+ b"server_side_session_id",
+ "session_id",
+ b"session_id",
+ ],
+ ) -> None: ...
+
+global___GetStatusResponse = GetStatusResponse
diff --git a/python/pyspark/sql/connect/proto/base_pb2_grpc.py
b/python/pyspark/sql/connect/proto/base_pb2_grpc.py
index 16dd20b563f3..14fd88bd0f11 100644
--- a/python/pyspark/sql/connect/proto/base_pb2_grpc.py
+++ b/python/pyspark/sql/connect/proto/base_pb2_grpc.py
@@ -96,6 +96,12 @@ class SparkConnectServiceStub(object):
response_deserializer=spark_dot_connect_dot_base__pb2.CloneSessionResponse.FromString,
_registered_method=True,
)
+ self.GetStatus = channel.unary_unary(
+ "/spark.connect.SparkConnectService/GetStatus",
+
request_serializer=spark_dot_connect_dot_base__pb2.GetStatusRequest.SerializeToString,
+
response_deserializer=spark_dot_connect_dot_base__pb2.GetStatusResponse.FromString,
+ _registered_method=True,
+ )
class SparkConnectServiceServicer(object):
@@ -192,6 +198,12 @@ class SparkConnectServiceServicer(object):
context.set_details("Method not implemented!")
raise NotImplementedError("Method not implemented!")
+ def GetStatus(self, request, context):
+ """Get status information of different types."""
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+ context.set_details("Method not implemented!")
+ raise NotImplementedError("Method not implemented!")
+
def add_SparkConnectServiceServicer_to_server(servicer, server):
rpc_method_handlers = {
@@ -250,6 +262,11 @@ def add_SparkConnectServiceServicer_to_server(servicer,
server):
request_deserializer=spark_dot_connect_dot_base__pb2.CloneSessionRequest.FromString,
response_serializer=spark_dot_connect_dot_base__pb2.CloneSessionResponse.SerializeToString,
),
+ "GetStatus": grpc.unary_unary_rpc_method_handler(
+ servicer.GetStatus,
+
request_deserializer=spark_dot_connect_dot_base__pb2.GetStatusRequest.FromString,
+
response_serializer=spark_dot_connect_dot_base__pb2.GetStatusResponse.SerializeToString,
+ ),
}
generic_handler = grpc.method_handlers_generic_handler(
"spark.connect.SparkConnectService", rpc_method_handlers
@@ -591,3 +608,33 @@ class SparkConnectService(object):
metadata,
_registered_method=True,
)
+
+ @staticmethod
+ def GetStatus(
+ request,
+ target,
+ options=(),
+ channel_credentials=None,
+ call_credentials=None,
+ insecure=False,
+ compression=None,
+ wait_for_ready=None,
+ timeout=None,
+ metadata=None,
+ ):
+ return grpc.experimental.unary_unary(
+ request,
+ target,
+ "/spark.connect.SparkConnectService/GetStatus",
+ spark_dot_connect_dot_base__pb2.GetStatusRequest.SerializeToString,
+ spark_dot_connect_dot_base__pb2.GetStatusResponse.FromString,
+ options,
+ channel_credentials,
+ insecure,
+ call_credentials,
+ compression,
+ wait_for_ready,
+ timeout,
+ metadata,
+ _registered_method=True,
+ )
diff --git a/sql/connect/common/src/main/protobuf/spark/connect/base.proto
b/sql/connect/common/src/main/protobuf/spark/connect/base.proto
index a97d2d25f490..c2e8c5af2c79 100644
--- a/sql/connect/common/src/main/protobuf/spark/connect/base.proto
+++ b/sql/connect/common/src/main/protobuf/spark/connect/base.proto
@@ -1213,11 +1213,94 @@ message CloneSessionResponse {
// Session id of the new cloned session.
string new_session_id = 3;
-
+
// Server-side session ID of the new cloned session.
string new_server_side_session_id = 4;
}
+// Next ID: 6
+message GetStatusRequest {
+ // (Required)
+ //
+ // The session_id specifies a Spark session for a user identified by
user_context.user_id.
+ // The id should be an UUID string of the format
`00112233-4455-6677-8899-aabbccddeeff`
+ string session_id = 1;
+
+ // (Required)
+ //
+ // user_context.user_id and session_id both identify a unique remote spark
session on the
+ // server side.
+ UserContext user_context = 2;
+
+ // (Optional)
+ //
+ // Provides optional information about the client sending the request. This
field
+ // can be used for language or version specific information and is only
intended for
+ // logging purposes and will not be interpreted by the server.
+ optional string client_type = 3;
+
+ // (Optional)
+ //
+ // Server-side generated idempotency key from the previous responses (if
any). Server
+ // can use this to validate that the server side session has not changed.
+ optional string client_observed_server_side_session_id = 4;
+
+ // (Optional)
+ //
+ // Get status of operations in the session.
+ optional OperationStatusRequest operation_status = 5;
+
+ // Extension point for custom status request types.
+ repeated google.protobuf.Any extensions = 999;
+
+ message OperationStatusRequest {
+ // Get status of operations with these operation_ids.
+ // If unset or empty, returns status of all operations in the session.
+ repeated string operation_ids = 1;
+
+ // Extension point for custom operation-level status requests.
+ repeated google.protobuf.Any extensions = 999;
+ }
+}
+
+// Next ID: 4
+message GetStatusResponse {
+ // Session id of the session for which the status was requested.
+ string session_id = 1;
+
+ // Server-side generated idempotency key that the client can use to assert
that the server side
+ // session has not changed.
+ string server_side_session_id = 2;
+
+ // Status information about requested operations.
+ repeated OperationStatus operation_statuses = 3;
+
+ // Extension point for custom status response types.
+ repeated google.protobuf.Any extensions = 999;
+
+ // Status information for a single operation.
+ message OperationStatus {
+ // The operation_id of the operation.
+ string operation_id = 1;
+
+ // The current status of the operation.
+ OperationState state = 2;
+
+ // Extension point for custom operation-level status fields.
+ repeated google.protobuf.Any extensions = 999;
+
+ enum OperationState {
+ OPERATION_STATE_UNSPECIFIED = 0;
+ OPERATION_STATE_UNKNOWN = 1;
+ OPERATION_STATE_RUNNING = 2;
+ OPERATION_STATE_TERMINATING = 3;
+ OPERATION_STATE_SUCCEEDED = 4;
+ OPERATION_STATE_FAILED = 5;
+ OPERATION_STATE_CANCELLED = 6;
+ }
+ }
+}
+
// Main interface for the SparkConnect service.
service SparkConnectService {
@@ -1272,4 +1355,7 @@ service SparkConnectService {
// The request can optionally specify a custom session ID for the cloned
session (must be
// a valid UUID). If not provided, a new UUID will be generated
automatically.
rpc CloneSession(CloneSessionRequest) returns (CloneSessionResponse) {}
+
+ // Get status information of different types.
+ rpc GetStatus(GetStatusRequest) returns (GetStatusResponse) {}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]