Re: [PR] [SPARK-48258][PYTHON][CONNECT] Checkpoint and localCheckpoint in Spark Connect [spark]

2024-05-20 Thread via GitHub


HyukjinKwon closed pull request #46570: [SPARK-48258][PYTHON][CONNECT] 
Checkpoint and localCheckpoint in Spark Connect
URL: https://github.com/apache/spark/pull/46570


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48258][PYTHON][CONNECT] Checkpoint and localCheckpoint in Spark Connect [spark]

2024-05-20 Thread via GitHub


HyukjinKwon commented on PR #46570:
URL: https://github.com/apache/spark/pull/46570#issuecomment-2121464274

   Merged to master.
   
   I will followup the discussion if there are more to address since we're 
releasing preview soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48258][PYTHON][CONNECT] Checkpoint and localCheckpoint in Spark Connect [spark]

2024-05-20 Thread via GitHub


HyukjinKwon commented on code in PR #46570:
URL: https://github.com/apache/spark/pull/46570#discussion_r1606684213


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##
@@ -3507,6 +3513,47 @@ class SparkConnectPlanner(
 .build())
   }
 
+  private def handleCheckpointCommand(
+  checkpointCommand: CheckpointCommand,
+  responseObserver: StreamObserver[proto.ExecutePlanResponse]): Unit = {
+val target = Dataset
+  .ofRows(session, transformRelation(checkpointCommand.getRelation))
+val checkpointed = if (checkpointCommand.hasLocal && 
checkpointCommand.hasEager) {

Review Comment:
   I just did it locally but I think it's actually better to keep just as is .. 
I think the current one is easier to read .. `Dataset.checkpoint(eager: 
Boolean, reliableCheckpoint: Boolean)` is private as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48258][PYTHON][CONNECT] Checkpoint and localCheckpoint in Spark Connect [spark]

2024-05-20 Thread via GitHub


HyukjinKwon commented on code in PR #46570:
URL: https://github.com/apache/spark/pull/46570#discussion_r1606678743


##
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##
@@ -484,3 +486,26 @@ message CreateResourceProfileCommandResult {
   // (Required) Server-side generated resource profile id.
   int32 profile_id = 1;
 }
+
+// Command to remove `CashedRemoteRelation`
+message RemoveCachedRemoteRelationCommand {
+  // (Required) The remote to be related
+  CachedRemoteRelation relation = 1;
+}
+
+message CheckpointCommand {
+  // (Required) The logical plan to checkpoint.
+  Relation relation = 1;
+
+  // (Optional) Locally checkpoint using a local temporary
+  // directory in Spark Connect server (Spark Driver)
+  optional bool local = 2;
+
+  // (Optional) Whether to checkpoint this dataframe immediately.
+  optional bool eager = 3;

Review Comment:
   We can but I actually followed other cases though (see `optional bool` at 
`relations.proto`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48258][PYTHON][CONNECT] Checkpoint and localCheckpoint in Spark Connect [spark]

2024-05-20 Thread via GitHub


HyukjinKwon commented on code in PR #46570:
URL: https://github.com/apache/spark/pull/46570#discussion_r1606673393


##
python/pyspark/sql/connect/dataframe.py:
##
@@ -138,6 +138,41 @@ def __init__(
 # by __repr__ and _repr_html_ while eager evaluation opens.
 self._support_repr_html = False
 self._cached_schema: Optional[StructType] = None
+self._cached_remote_relation_id: Optional[str] = None
+
+def __del__(self) -> None:

Review Comment:
   and that (previous) maintainer knows quite well. Although now I am the 
maintainer for Py4J though :-).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48258][PYTHON][CONNECT] Checkpoint and localCheckpoint in Spark Connect [spark]

2024-05-20 Thread via GitHub


HyukjinKwon commented on code in PR #46570:
URL: https://github.com/apache/spark/pull/46570#discussion_r1606672819


##
python/pyspark/sql/connect/dataframe.py:
##
@@ -138,6 +138,41 @@ def __init__(
 # by __repr__ and _repr_html_ while eager evaluation opens.
 self._support_repr_html = False
 self._cached_schema: Optional[StructType] = None
+self._cached_remote_relation_id: Optional[str] = None
+
+def __del__(self) -> None:

Review Comment:
   At least Py4J does the same thing (socket connection).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48258][PYTHON][CONNECT] Checkpoint and localCheckpoint in Spark Connect [spark]

2024-05-20 Thread via GitHub


hvanhovell commented on code in PR #46570:
URL: https://github.com/apache/spark/pull/46570#discussion_r1606497789


##
python/pyspark/sql/connect/dataframe.py:
##
@@ -138,6 +138,41 @@ def __init__(
 # by __repr__ and _repr_html_ while eager evaluation opens.
 self._support_repr_html = False
 self._cached_schema: Optional[StructType] = None
+self._cached_remote_relation_id: Optional[str] = None
+
+def __del__(self) -> None:

Review Comment:
   So no Python GC expert here. I am assuming some system thread is doing this 
work. Is it wise to execute an RPC from there?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48258][PYTHON][CONNECT] Checkpoint and localCheckpoint in Spark Connect [spark]

2024-05-20 Thread via GitHub


hvanhovell commented on code in PR #46570:
URL: https://github.com/apache/spark/pull/46570#discussion_r1606484039


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##
@@ -3507,6 +3513,47 @@ class SparkConnectPlanner(
 .build())
   }
 
+  private def handleCheckpointCommand(
+  checkpointCommand: CheckpointCommand,
+  responseObserver: StreamObserver[proto.ExecutePlanResponse]): Unit = {
+val target = Dataset
+  .ofRows(session, transformRelation(checkpointCommand.getRelation))
+val checkpointed = if (checkpointCommand.hasLocal && 
checkpointCommand.hasEager) {

Review Comment:
   You could also increase the visibility of `Dataset.checkpoint(eager: 
Boolean, reliableCheckpoint: Boolean)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48258][PYTHON][CONNECT] Checkpoint and localCheckpoint in Spark Connect [spark]

2024-05-20 Thread via GitHub


hvanhovell commented on code in PR #46570:
URL: https://github.com/apache/spark/pull/46570#discussion_r1606482770


##
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##
@@ -484,3 +486,26 @@ message CreateResourceProfileCommandResult {
   // (Required) Server-side generated resource profile id.
   int32 profile_id = 1;
 }
+
+// Command to remove `CashedRemoteRelation`
+message RemoveCachedRemoteRelationCommand {
+  // (Required) The remote to be related
+  CachedRemoteRelation relation = 1;
+}
+
+message CheckpointCommand {
+  // (Required) The logical plan to checkpoint.
+  Relation relation = 1;
+
+  // (Optional) Locally checkpoint using a local temporary
+  // directory in Spark Connect server (Spark Driver)
+  optional bool local = 2;
+
+  // (Optional) Whether to checkpoint this dataframe immediately.
+  optional bool eager = 3;
+}
+
+message CheckpointCommandResult {

Review Comment:
   Move this to base.proto since this is part of the execute reponse?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48258][PYTHON][CONNECT] Checkpoint and localCheckpoint in Spark Connect [spark]

2024-05-20 Thread via GitHub


hvanhovell commented on code in PR #46570:
URL: https://github.com/apache/spark/pull/46570#discussion_r1606482118


##
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##
@@ -484,3 +486,26 @@ message CreateResourceProfileCommandResult {
   // (Required) Server-side generated resource profile id.
   int32 profile_id = 1;
 }
+
+// Command to remove `CashedRemoteRelation`
+message RemoveCachedRemoteRelationCommand {
+  // (Required) The remote to be related
+  CachedRemoteRelation relation = 1;
+}
+
+message CheckpointCommand {
+  // (Required) The logical plan to checkpoint.
+  Relation relation = 1;
+
+  // (Optional) Locally checkpoint using a local temporary
+  // directory in Spark Connect server (Spark Driver)
+  optional bool local = 2;
+
+  // (Optional) Whether to checkpoint this dataframe immediately.
+  optional bool eager = 3;

Review Comment:
   The default is `eager = true` right? Should the protocol encode this better? 
Currently the protocol defaults to `eager = false` if the field is not set, so 
my question is should we flip the logic (i.e. replace this with `lazy`) so the 
default behavior does not require you to set additional fields.
   
   The same question for `local`...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48258][PYTHON][CONNECT] Checkpoint and localCheckpoint in Spark Connect [spark]

2024-05-17 Thread via GitHub


hvanhovell commented on code in PR #46570:
URL: https://github.com/apache/spark/pull/46570#discussion_r1605509972


##
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##
@@ -199,6 +200,17 @@ message AnalyzePlanRequest {
 // (Required) The logical plan to get the storage level.
 Relation relation = 1;
   }
+
+  message Checkpoint {

Review Comment:
   Caching is lazy, so it cannot trigger execution of the main query. 
Checkpoint can actually do this, which is weird for an analyze request. 
Moreover analyse does not work well with long running operations (i.e. an eager 
checkpoint) for the following reasons::
   - Analyze needs a thread in the server's thread pool which in extreme cases 
can lead to exhaustion, or in a less extreme scenario can lead to reduced 
availability.
   - Long running request can be killed by intermediate proxies. Analyze does 
not support reattach which is an issue when the checkpoint takes too long. In 
that case you are basically dead in the water.
   
   The argument that ExecutePlanResponse does not support this is IMO a bit 
flimsy. Look for example at the `SqlCommandResult` this can be returned as part 
of the `ExecutePlanResponse` and it returns a relation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48258][PYTHON][CONNECT] Checkpoint and localCheckpoint in Spark Connect [spark]

2024-05-16 Thread via GitHub


HyukjinKwon commented on code in PR #46570:
URL: https://github.com/apache/spark/pull/46570#discussion_r1604289643


##
python/pyspark/sql/connect/dataframe.py:
##
@@ -104,6 +107,8 @@
 
 
 class DataFrame(ParentDataFrame):
+_release_thread_pool: Optional[ThreadPool] = ThreadPool(os.cpu_count() if 
os.cpu_count() else 8)

Review Comment:
   Oops



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48258][PYTHON][CONNECT] Checkpoint and localCheckpoint in Spark Connect [spark]

2024-05-16 Thread via GitHub


zhengruifeng commented on code in PR #46570:
URL: https://github.com/apache/spark/pull/46570#discussion_r1604267324


##
python/pyspark/sql/connect/dataframe.py:
##
@@ -104,6 +107,8 @@
 
 
 class DataFrame(ParentDataFrame):
+_release_thread_pool: Optional[ThreadPool] = ThreadPool(os.cpu_count() if 
os.cpu_count() else 8)

Review Comment:
   where is this used?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [[SPARK-48258][PYTHON][CONNECT] Checkpoint and localCheckpoint in Spark Connect [spark]

2024-05-13 Thread via GitHub


HyukjinKwon opened a new pull request, #46570:
URL: https://github.com/apache/spark/pull/46570

   ### What changes were proposed in this pull request?
   
   TBD
   
   ### Why are the changes needed?
   
   TBD
   
   ### Does this PR introduce _any_ user-facing change?
   
   TBD
   
   ### How was this patch tested?
   
   TBD
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org