Re: [PR] [SPARK-48258][PYTHON][CONNECT] Checkpoint and localCheckpoint in Spark Connect [spark]
HyukjinKwon closed pull request #46570: [SPARK-48258][PYTHON][CONNECT] Checkpoint and localCheckpoint in Spark Connect URL: https://github.com/apache/spark/pull/46570 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48258][PYTHON][CONNECT] Checkpoint and localCheckpoint in Spark Connect [spark]
HyukjinKwon commented on PR #46570: URL: https://github.com/apache/spark/pull/46570#issuecomment-2121464274 Merged to master. I will followup the discussion if there are more to address since we're releasing preview soon. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48258][PYTHON][CONNECT] Checkpoint and localCheckpoint in Spark Connect [spark]
HyukjinKwon commented on code in PR #46570: URL: https://github.com/apache/spark/pull/46570#discussion_r1606684213 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala: ## @@ -3507,6 +3513,47 @@ class SparkConnectPlanner( .build()) } + private def handleCheckpointCommand( + checkpointCommand: CheckpointCommand, + responseObserver: StreamObserver[proto.ExecutePlanResponse]): Unit = { +val target = Dataset + .ofRows(session, transformRelation(checkpointCommand.getRelation)) +val checkpointed = if (checkpointCommand.hasLocal && checkpointCommand.hasEager) { Review Comment: I just did it locally but I think it's actually better to keep just as is .. I think the current one is easier to read .. `Dataset.checkpoint(eager: Boolean, reliableCheckpoint: Boolean)` is private as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48258][PYTHON][CONNECT] Checkpoint and localCheckpoint in Spark Connect [spark]
HyukjinKwon commented on code in PR #46570: URL: https://github.com/apache/spark/pull/46570#discussion_r1606678743 ## connector/connect/common/src/main/protobuf/spark/connect/commands.proto: ## @@ -484,3 +486,26 @@ message CreateResourceProfileCommandResult { // (Required) Server-side generated resource profile id. int32 profile_id = 1; } + +// Command to remove `CashedRemoteRelation` +message RemoveCachedRemoteRelationCommand { + // (Required) The remote to be related + CachedRemoteRelation relation = 1; +} + +message CheckpointCommand { + // (Required) The logical plan to checkpoint. + Relation relation = 1; + + // (Optional) Locally checkpoint using a local temporary + // directory in Spark Connect server (Spark Driver) + optional bool local = 2; + + // (Optional) Whether to checkpoint this dataframe immediately. + optional bool eager = 3; Review Comment: We can but I actually followed other cases though (see `optional bool` at `relations.proto`). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48258][PYTHON][CONNECT] Checkpoint and localCheckpoint in Spark Connect [spark]
HyukjinKwon commented on code in PR #46570: URL: https://github.com/apache/spark/pull/46570#discussion_r1606673393 ## python/pyspark/sql/connect/dataframe.py: ## @@ -138,6 +138,41 @@ def __init__( # by __repr__ and _repr_html_ while eager evaluation opens. self._support_repr_html = False self._cached_schema: Optional[StructType] = None +self._cached_remote_relation_id: Optional[str] = None + +def __del__(self) -> None: Review Comment: and that (previous) maintainer knows quite well. Although now I am the maintainer for Py4J though :-). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48258][PYTHON][CONNECT] Checkpoint and localCheckpoint in Spark Connect [spark]
HyukjinKwon commented on code in PR #46570: URL: https://github.com/apache/spark/pull/46570#discussion_r1606672819 ## python/pyspark/sql/connect/dataframe.py: ## @@ -138,6 +138,41 @@ def __init__( # by __repr__ and _repr_html_ while eager evaluation opens. self._support_repr_html = False self._cached_schema: Optional[StructType] = None +self._cached_remote_relation_id: Optional[str] = None + +def __del__(self) -> None: Review Comment: At least Py4J does the same thing (socket connection). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48258][PYTHON][CONNECT] Checkpoint and localCheckpoint in Spark Connect [spark]
hvanhovell commented on code in PR #46570: URL: https://github.com/apache/spark/pull/46570#discussion_r1606497789 ## python/pyspark/sql/connect/dataframe.py: ## @@ -138,6 +138,41 @@ def __init__( # by __repr__ and _repr_html_ while eager evaluation opens. self._support_repr_html = False self._cached_schema: Optional[StructType] = None +self._cached_remote_relation_id: Optional[str] = None + +def __del__(self) -> None: Review Comment: So no Python GC expert here. I am assuming some system thread is doing this work. Is it wise to execute an RPC from there? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48258][PYTHON][CONNECT] Checkpoint and localCheckpoint in Spark Connect [spark]
hvanhovell commented on code in PR #46570: URL: https://github.com/apache/spark/pull/46570#discussion_r1606484039 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala: ## @@ -3507,6 +3513,47 @@ class SparkConnectPlanner( .build()) } + private def handleCheckpointCommand( + checkpointCommand: CheckpointCommand, + responseObserver: StreamObserver[proto.ExecutePlanResponse]): Unit = { +val target = Dataset + .ofRows(session, transformRelation(checkpointCommand.getRelation)) +val checkpointed = if (checkpointCommand.hasLocal && checkpointCommand.hasEager) { Review Comment: You could also increase the visibility of `Dataset.checkpoint(eager: Boolean, reliableCheckpoint: Boolean)`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48258][PYTHON][CONNECT] Checkpoint and localCheckpoint in Spark Connect [spark]
hvanhovell commented on code in PR #46570: URL: https://github.com/apache/spark/pull/46570#discussion_r1606482770 ## connector/connect/common/src/main/protobuf/spark/connect/commands.proto: ## @@ -484,3 +486,26 @@ message CreateResourceProfileCommandResult { // (Required) Server-side generated resource profile id. int32 profile_id = 1; } + +// Command to remove `CashedRemoteRelation` +message RemoveCachedRemoteRelationCommand { + // (Required) The remote to be related + CachedRemoteRelation relation = 1; +} + +message CheckpointCommand { + // (Required) The logical plan to checkpoint. + Relation relation = 1; + + // (Optional) Locally checkpoint using a local temporary + // directory in Spark Connect server (Spark Driver) + optional bool local = 2; + + // (Optional) Whether to checkpoint this dataframe immediately. + optional bool eager = 3; +} + +message CheckpointCommandResult { Review Comment: Move this to base.proto since this is part of the execute reponse? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48258][PYTHON][CONNECT] Checkpoint and localCheckpoint in Spark Connect [spark]
hvanhovell commented on code in PR #46570: URL: https://github.com/apache/spark/pull/46570#discussion_r1606482118 ## connector/connect/common/src/main/protobuf/spark/connect/commands.proto: ## @@ -484,3 +486,26 @@ message CreateResourceProfileCommandResult { // (Required) Server-side generated resource profile id. int32 profile_id = 1; } + +// Command to remove `CashedRemoteRelation` +message RemoveCachedRemoteRelationCommand { + // (Required) The remote to be related + CachedRemoteRelation relation = 1; +} + +message CheckpointCommand { + // (Required) The logical plan to checkpoint. + Relation relation = 1; + + // (Optional) Locally checkpoint using a local temporary + // directory in Spark Connect server (Spark Driver) + optional bool local = 2; + + // (Optional) Whether to checkpoint this dataframe immediately. + optional bool eager = 3; Review Comment: The default is `eager = true` right? Should the protocol encode this better? Currently the protocol defaults to `eager = false` if the field is not set, so my question is should we flip the logic (i.e. replace this with `lazy`) so the default behavior does not require you to set additional fields. The same question for `local`... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48258][PYTHON][CONNECT] Checkpoint and localCheckpoint in Spark Connect [spark]
hvanhovell commented on code in PR #46570: URL: https://github.com/apache/spark/pull/46570#discussion_r1605509972 ## connector/connect/common/src/main/protobuf/spark/connect/base.proto: ## @@ -199,6 +200,17 @@ message AnalyzePlanRequest { // (Required) The logical plan to get the storage level. Relation relation = 1; } + + message Checkpoint { Review Comment: Caching is lazy, so it cannot trigger execution of the main query. Checkpoint can actually do this, which is weird for an analyze request. Moreover analyse does not work well with long running operations (i.e. an eager checkpoint) for the following reasons:: - Analyze needs a thread in the server's thread pool which in extreme cases can lead to exhaustion, or in a less extreme scenario can lead to reduced availability. - Long running request can be killed by intermediate proxies. Analyze does not support reattach which is an issue when the checkpoint takes too long. In that case you are basically dead in the water. The argument that ExecutePlanResponse does not support this is IMO a bit flimsy. Look for example at the `SqlCommandResult` this can be returned as part of the `ExecutePlanResponse` and it returns a relation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48258][PYTHON][CONNECT] Checkpoint and localCheckpoint in Spark Connect [spark]
HyukjinKwon commented on code in PR #46570: URL: https://github.com/apache/spark/pull/46570#discussion_r1604289643 ## python/pyspark/sql/connect/dataframe.py: ## @@ -104,6 +107,8 @@ class DataFrame(ParentDataFrame): +_release_thread_pool: Optional[ThreadPool] = ThreadPool(os.cpu_count() if os.cpu_count() else 8) Review Comment: Oops -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48258][PYTHON][CONNECT] Checkpoint and localCheckpoint in Spark Connect [spark]
zhengruifeng commented on code in PR #46570: URL: https://github.com/apache/spark/pull/46570#discussion_r1604267324 ## python/pyspark/sql/connect/dataframe.py: ## @@ -104,6 +107,8 @@ class DataFrame(ParentDataFrame): +_release_thread_pool: Optional[ThreadPool] = ThreadPool(os.cpu_count() if os.cpu_count() else 8) Review Comment: where is this used? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [[SPARK-48258][PYTHON][CONNECT] Checkpoint and localCheckpoint in Spark Connect [spark]
HyukjinKwon opened a new pull request, #46570: URL: https://github.com/apache/spark/pull/46570 ### What changes were proposed in this pull request? TBD ### Why are the changes needed? TBD ### Does this PR introduce _any_ user-facing change? TBD ### How was this patch tested? TBD ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org