This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 43cedc0b3dea [SPARK-46812][CONNECT][PYTHON] Make mapInPandas / 
mapInArrow support ResourceProfile
43cedc0b3dea is described below

commit 43cedc0b3deabc388896ff71ae0312b34b638f4b
Author: Bobby Wang <wbo4...@gmail.com>
AuthorDate: Tue Apr 2 09:11:23 2024 +0900

    [SPARK-46812][CONNECT][PYTHON] Make mapInPandas / mapInArrow support 
ResourceProfile
    
    ### What changes were proposed in this pull request?
    
    Support stage-level scheduling for PySpark connect DataFrame APIs 
(mapInPandas and mapInArrow).
    
    ### Why are the changes needed?
    
    https://github.com/apache/spark/pull/44852 has supported ResourceProfile in 
mapInPandas/mapInArrow for SQL, So it's the right time to enable it for connect.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, Users can pass ResourceProfile to mapInPandas/mapInArrow through the 
connect pyspark client.
    
    ### How was this patch tested?
    
    Pass the CIs and manual tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #45232 from wbo4958/connect-rp.
    
    Authored-by: Bobby Wang <wbo4...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../src/main/protobuf/spark/connect/base.proto     |   4 +-
 .../src/main/protobuf/spark/connect/commands.proto |  13 +
 .../src/main/protobuf/spark/connect/common.proto   |  35 +++
 .../main/protobuf/spark/connect/relations.proto    |   4 +
 .../sql/connect/planner/SparkConnectPlanner.scala  |  58 +++-
 dev/check_pyspark_custom_errors.py                 |   1 +
 dev/sparktestsupport/modules.py                    |   2 +
 python/pyspark/resource/profile.py                 |  40 ++-
 python/pyspark/resource/requests.py                |   8 +-
 ...test_resources.py => test_connect_resources.py} |  61 +++--
 python/pyspark/resource/tests/test_resources.py    |  13 +
 python/pyspark/sql/connect/client/core.py          |  12 +
 python/pyspark/sql/connect/dataframe.py            |  18 +-
 python/pyspark/sql/connect/plan.py                 |   5 +
 python/pyspark/sql/connect/proto/base_pb2.py       | 204 +++++++-------
 python/pyspark/sql/connect/proto/base_pb2.pyi      |  13 +
 python/pyspark/sql/connect/proto/commands_pb2.py   | 184 ++++++-------
 python/pyspark/sql/connect/proto/commands_pb2.pyi  |  49 ++++
 python/pyspark/sql/connect/proto/common_pb2.py     |  16 +-
 python/pyspark/sql/connect/proto/common_pb2.pyi    | 175 ++++++++++++
 python/pyspark/sql/connect/proto/relations_pb2.py  | 299 +++++++++++----------
 python/pyspark/sql/connect/proto/relations_pb2.pyi |  17 ++
 python/pyspark/sql/connect/resource/__init__.py    |  16 ++
 python/pyspark/sql/connect/resource/profile.py     |  69 +++++
 python/pyspark/sql/tests/connect/test_resources.py |  38 +++
 25 files changed, 960 insertions(+), 394 deletions(-)

diff --git 
a/connector/connect/common/src/main/protobuf/spark/connect/base.proto 
b/connector/connect/common/src/main/protobuf/spark/connect/base.proto
index bcc7edc55550..9a9121d84f76 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/base.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/base.proto
@@ -375,6 +375,9 @@ message ExecutePlanResponse {
     // Response type informing if the stream is complete in reattachable 
execution.
     ResultComplete result_complete = 14;
 
+    // Response for command that creates ResourceProfile.
+    CreateResourceProfileCommandResult create_resource_profile_command_result 
= 17;
+
     // Support arbitrary result objects.
     google.protobuf.Any extension = 999;
   }
@@ -1069,4 +1072,3 @@ service SparkConnectService {
   // FetchErrorDetails retrieves the matched exception with details based on a 
provided error id.
   rpc FetchErrorDetails(FetchErrorDetailsRequest) returns 
(FetchErrorDetailsResponse) {}
 }
-
diff --git 
a/connector/connect/common/src/main/protobuf/spark/connect/commands.proto 
b/connector/connect/common/src/main/protobuf/spark/connect/commands.proto
index ad4bbfd8392d..e0ccf01fe92e 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/commands.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/commands.proto
@@ -44,6 +44,7 @@ message Command {
     CommonInlineUserDefinedTableFunction register_table_function = 10;
     StreamingQueryListenerBusCommand streaming_query_listener_bus_command = 11;
     CommonInlineUserDefinedDataSource register_data_source = 12;
+    CreateResourceProfileCommand create_resource_profile_command = 13;
 
     // This field is used to mark extensions to the protocol. When plugins 
generate arbitrary
     // Commands they can add them here. During the planning the correct 
resolution is done.
@@ -468,3 +469,15 @@ message GetResourcesCommand { }
 message GetResourcesCommandResult {
   map<string, ResourceInformation> resources = 1;
 }
+
+// Command to create ResourceProfile
+message CreateResourceProfileCommand {
+  // (Required) The ResourceProfile to be built on the server-side.
+  ResourceProfile profile = 1;
+}
+
+// Response for command 'CreateResourceProfileCommand'.
+message CreateResourceProfileCommandResult {
+  // (Required) Server-side generated resource profile id.
+  int32 profile_id = 1;
+}
diff --git 
a/connector/connect/common/src/main/protobuf/spark/connect/common.proto 
b/connector/connect/common/src/main/protobuf/spark/connect/common.proto
index 5c538cf10825..da334bfd9ee8 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/common.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/common.proto
@@ -46,3 +46,38 @@ message ResourceInformation {
   // (Required) An array of strings describing the addresses of the resource.
   repeated string addresses = 2;
 }
+
+// An executor resource request.
+message ExecutorResourceRequest {
+  // (Required) resource name.
+  string resource_name = 1;
+
+  // (Required) resource amount requesting.
+  int64 amount = 2;
+
+  // Optional script used to discover the resources.
+  optional string discovery_script = 3;
+
+  // Optional vendor, required for some cluster managers.
+  optional string vendor = 4;
+}
+
+// A task resource request.
+message TaskResourceRequest {
+  // (Required) resource name.
+  string resource_name = 1;
+
+  // (Required) resource amount requesting as a double to support fractional
+  // resource requests.
+  double amount = 2;
+}
+
+message ResourceProfile {
+  // (Optional) Resource requests for executors. Mapped from the resource name
+  // (e.g., cores, memory, CPU) to its specific request.
+  map<string, ExecutorResourceRequest> executor_resources = 1;
+
+  // (Optional) Resource requests for tasks. Mapped from the resource name
+  // (e.g., cores, memory, CPU) to its specific request.
+  map<string, TaskResourceRequest> task_resources = 2;
+}
diff --git 
a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto 
b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
index 1e93aaf9f20e..4d4324ed340b 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
@@ -23,6 +23,7 @@ import "google/protobuf/any.proto";
 import "spark/connect/expressions.proto";
 import "spark/connect/types.proto";
 import "spark/connect/catalog.proto";
+import "spark/connect/common.proto";
 
 option java_multiple_files = true;
 option java_package = "org.apache.spark.connect.proto";
@@ -893,6 +894,9 @@ message MapPartitions {
 
   // (Optional) Whether to use barrier mode execution or not.
   optional bool is_barrier = 3;
+
+  // (Optional) ResourceProfile id used for the stage level scheduling.
+  optional int32 profile_id = 4;
 }
 
 message GroupMap {
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index b12e31068683..313c17c25473 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -32,16 +32,14 @@ import org.apache.commons.lang3.exception.ExceptionUtils
 import org.apache.spark.{Partition, SparkEnv, TaskContext}
 import org.apache.spark.api.python.{PythonEvalType, SimplePythonFunction}
 import org.apache.spark.connect.proto
-import org.apache.spark.connect.proto.{ExecutePlanResponse, SqlCommand, 
StreamingQueryCommand, StreamingQueryCommandResult, StreamingQueryInstanceId, 
WriteStreamOperationStart, WriteStreamOperationStartResult}
+import org.apache.spark.connect.proto.{CreateResourceProfileCommand, 
ExecutePlanResponse, SqlCommand, StreamingForeachFunction, 
StreamingQueryCommand, StreamingQueryCommandResult, StreamingQueryInstanceId, 
StreamingQueryManagerCommand, StreamingQueryManagerCommandResult, 
WriteStreamOperationStart, WriteStreamOperationStartResult}
 import org.apache.spark.connect.proto.ExecutePlanResponse.SqlCommandResult
 import org.apache.spark.connect.proto.Parse.ParseFormat
-import org.apache.spark.connect.proto.StreamingForeachFunction
-import org.apache.spark.connect.proto.StreamingQueryManagerCommand
-import org.apache.spark.connect.proto.StreamingQueryManagerCommandResult
 import 
org.apache.spark.connect.proto.StreamingQueryManagerCommandResult.StreamingQueryInstance
 import org.apache.spark.connect.proto.WriteStreamOperationStart.TriggerCase
 import org.apache.spark.internal.Logging
 import org.apache.spark.ml.{functions => MLFunctions}
+import org.apache.spark.resource.{ExecutorResourceRequest, ResourceProfile, 
TaskResourceProfile, TaskResourceRequest}
 import org.apache.spark.sql.{Column, Dataset, Encoders, ForeachWriter, 
Observation, RelationalGroupedDataset, SparkSession}
 import org.apache.spark.sql.avro.{AvroDataToCatalyst, CatalystDataToAvro}
 import org.apache.spark.sql.catalyst.{expressions, AliasIdentifier, 
FunctionIdentifier}
@@ -544,6 +542,12 @@ class SparkConnectPlanner(
       case proto.CommonInlineUserDefinedFunction.FunctionCase.PYTHON_UDF =>
         val pythonUdf = transformPythonUDF(commonUdf)
         val isBarrier = if (rel.hasIsBarrier) rel.getIsBarrier else false
+        val profile = if (rel.hasProfileId) {
+          val profileId = rel.getProfileId
+          
Some(session.sparkContext.resourceProfileManager.resourceProfileFromId(profileId))
+        } else {
+          None
+        }
         pythonUdf.evalType match {
           case PythonEvalType.SQL_MAP_PANDAS_ITER_UDF =>
             logical.MapInPandas(
@@ -551,14 +555,14 @@ class SparkConnectPlanner(
               
DataTypeUtils.toAttributes(pythonUdf.dataType.asInstanceOf[StructType]),
               baseRel,
               isBarrier,
-              None)
+              profile)
           case PythonEvalType.SQL_MAP_ARROW_ITER_UDF =>
             logical.MapInArrow(
               pythonUdf,
               
DataTypeUtils.toAttributes(pythonUdf.dataType.asInstanceOf[StructType]),
               baseRel,
               isBarrier,
-              None)
+              profile)
           case _ =>
             throw InvalidPlanInput(
               s"Function with EvalType: ${pythonUdf.evalType} is not 
supported")
@@ -2531,6 +2535,11 @@ class SparkConnectPlanner(
           responseObserver)
       case proto.Command.CommandTypeCase.GET_RESOURCES_COMMAND =>
         handleGetResourcesCommand(responseObserver)
+      case proto.Command.CommandTypeCase.CREATE_RESOURCE_PROFILE_COMMAND =>
+        handleCreateResourceProfileCommand(
+          command.getCreateResourceProfileCommand,
+          responseObserver)
+
       case _ => throw new UnsupportedOperationException(s"$command not 
supported.")
     }
   }
@@ -3327,6 +3336,43 @@ class SparkConnectPlanner(
         .build())
   }
 
+  def handleCreateResourceProfileCommand(
+      createResourceProfileCommand: CreateResourceProfileCommand,
+      responseObserver: StreamObserver[proto.ExecutePlanResponse]): Unit = {
+    val rp = createResourceProfileCommand.getProfile
+    val ereqs = rp.getExecutorResourcesMap.asScala.map { case (name, res) =>
+      name -> new ExecutorResourceRequest(
+        res.getResourceName,
+        res.getAmount,
+        res.getDiscoveryScript,
+        res.getVendor)
+    }.toMap
+    val treqs = rp.getTaskResourcesMap.asScala.map { case (name, res) =>
+      name -> new TaskResourceRequest(res.getResourceName, res.getAmount)
+    }.toMap
+
+    // Create ResourceProfile add add it to ResourceProfileManager
+    val profile = if (ereqs.isEmpty) {
+      new TaskResourceProfile(treqs)
+    } else {
+      new ResourceProfile(ereqs, treqs)
+    }
+    session.sparkContext.resourceProfileManager.addResourceProfile(profile)
+
+    executeHolder.eventsManager.postFinished()
+    responseObserver.onNext(
+      proto.ExecutePlanResponse
+        .newBuilder()
+        .setSessionId(sessionId)
+        .setServerSideSessionId(sessionHolder.serverSessionId)
+        .setCreateResourceProfileCommandResult(
+          proto.CreateResourceProfileCommandResult
+            .newBuilder()
+            .setProfileId(profile.id)
+            .build())
+        .build())
+  }
+
   private val emptyLocalRelation = LocalRelation(
     output = AttributeReference("value", StringType, false)() :: Nil,
     data = Seq.empty)
diff --git a/dev/check_pyspark_custom_errors.py 
b/dev/check_pyspark_custom_errors.py
index d689f7905ea1..bce73c84028a 100644
--- a/dev/check_pyspark_custom_errors.py
+++ b/dev/check_pyspark_custom_errors.py
@@ -176,6 +176,7 @@ if __name__ == "__main__":
     TARGET_PATHS = ["python/pyspark/sql"]
     EXCLUDE_PATHS = [
         "python/pyspark/sql/tests",
+        "python/pyspark/sql/connect/resource",
         "python/pyspark/sql/connect/proto",
     ]
 
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index efc96c145393..7604e8f4b0a0 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -570,6 +570,7 @@ pyspark_resource = Module(
         "pyspark.resource.profile",
         # unittests
         "pyspark.resource.tests.test_resources",
+        "pyspark.resource.tests.test_connect_resources",
     ],
 )
 
@@ -1058,6 +1059,7 @@ pyspark_connect = Module(
         "pyspark.sql.tests.connect.test_parity_pandas_udf_scalar",
         "pyspark.sql.tests.connect.test_parity_pandas_udf_grouped_agg",
         "pyspark.sql.tests.connect.test_parity_pandas_udf_window",
+        "pyspark.sql.tests.connect.test_resources",
     ],
     excluded_python_implementations=[
         "PyPy"  # Skip these tests under PyPy since they require numpy, 
pandas, and pyarrow and
diff --git a/python/pyspark/resource/profile.py 
b/python/pyspark/resource/profile.py
index 237f049bfb56..a982f608c196 100644
--- a/python/pyspark/resource/profile.py
+++ b/python/pyspark/resource/profile.py
@@ -14,7 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-
+from threading import RLock
 from typing import overload, Dict, Union, Optional
 
 from py4j.java_gateway import JavaObject
@@ -37,6 +37,9 @@ class ResourceProfile:
 
     .. versionadded:: 3.1.0
 
+    .. versionchanged:: 4.0.0
+        Supports Spark Connect.
+
     Notes
     -----
     This API is evolving.
@@ -99,6 +102,11 @@ class ResourceProfile:
         _exec_req: Optional[Dict[str, ExecutorResourceRequest]] = None,
         _task_req: Optional[Dict[str, TaskResourceRequest]] = None,
     ):
+        # profile id
+        self._id: Optional[int] = None
+        # lock to protect _id
+        self._lock = RLock()
+
         if _java_resource_profile is not None:
             self._java_resource_profile = _java_resource_profile
         else:
@@ -114,14 +122,25 @@ class ResourceProfile:
         int
             A unique id of this :class:`ResourceProfile`
         """
+        with self._lock:
+            if self._id is None:
+                if self._java_resource_profile is not None:
+                    self._id = self._java_resource_profile.id()
+                else:
+                    from pyspark.sql import is_remote
 
-        if self._java_resource_profile is not None:
-            return self._java_resource_profile.id()
-        else:
-            raise RuntimeError(
-                "SparkContext must be created to get the id, get the id "
-                "after adding the ResourceProfile to an RDD"
-            )
+                    if is_remote():
+                        from pyspark.sql.connect.resource.profile import 
ResourceProfile
+
+                        # Utilize the connect ResourceProfile to create Spark 
ResourceProfile
+                        # on the server and get the profile ID.
+                        rp = ResourceProfile(
+                            self._executor_resource_requests, 
self._task_resource_requests
+                        )
+                        self._id = rp.id
+                    else:
+                        raise RuntimeError("SparkContext must be created to 
get the profile id.")
+            return self._id
 
     @property
     def taskResources(self) -> Dict[str, TaskResourceRequest]:
@@ -185,7 +204,10 @@ class ResourceProfileBuilder:
 
         # TODO: ignore[attr-defined] will be removed, once SparkContext is 
inlined
         _jvm = SparkContext._jvm
-        if _jvm is not None:
+
+        from pyspark.sql import is_remote
+
+        if _jvm is not None and not is_remote():
             self._jvm = _jvm
             self._java_resource_profile_builder = (
                 _jvm.org.apache.spark.resource.ResourceProfileBuilder()
diff --git a/python/pyspark/resource/requests.py 
b/python/pyspark/resource/requests.py
index 043124e69598..d3a43d3a06f7 100644
--- a/python/pyspark/resource/requests.py
+++ b/python/pyspark/resource/requests.py
@@ -164,9 +164,11 @@ class ExecutorResourceRequests:
         _requests: Optional[Dict[str, ExecutorResourceRequest]] = None,
     ):
         from pyspark import SparkContext
+        from pyspark.sql import is_remote
 
         _jvm = _jvm or SparkContext._jvm
-        if _jvm is not None:
+
+        if _jvm is not None and not is_remote():
             self._java_executor_resource_requests = (
                 _jvm.org.apache.spark.resource.ExecutorResourceRequests()
             )
@@ -460,9 +462,11 @@ class TaskResourceRequests:
         _requests: Optional[Dict[str, TaskResourceRequest]] = None,
     ):
         from pyspark import SparkContext
+        from pyspark.sql import is_remote
 
         _jvm = _jvm or SparkContext._jvm
-        if _jvm is not None:
+
+        if _jvm is not None and not is_remote():
             self._java_task_resource_requests: Optional[
                 JavaObject
             ] = _jvm.org.apache.spark.resource.TaskResourceRequests()
diff --git a/python/pyspark/resource/tests/test_resources.py 
b/python/pyspark/resource/tests/test_connect_resources.py
similarity index 61%
copy from python/pyspark/resource/tests/test_resources.py
copy to python/pyspark/resource/tests/test_connect_resources.py
index 81a4ea4f1d89..1667037367ba 100644
--- a/python/pyspark/resource/tests/test_resources.py
+++ b/python/pyspark/resource/tests/test_connect_resources.py
@@ -16,15 +16,34 @@
 #
 import unittest
 
-from pyspark.resource import ExecutorResourceRequests, ResourceProfileBuilder, 
TaskResourceRequests
+from pyspark.resource import ResourceProfileBuilder, TaskResourceRequests, 
ExecutorResourceRequests
+from pyspark.sql import SparkSession
 
 
 class ResourceProfileTests(unittest.TestCase):
-    def test_profile_before_sc(self):
+    def test_profile_before_sc_for_connect(self):
         rpb = ResourceProfileBuilder()
-        ereqs = 
ExecutorResourceRequests().cores(2).memory("6g").memoryOverhead("1g")
-        ereqs.pysparkMemory("2g").offheapMemory("3g").resource("gpu", 2, 
"testGpus", "nvidia.com")
-        treqs = TaskResourceRequests().cpus(2).resource("gpu", 2)
+        treqs = TaskResourceRequests().cpus(2)
+        # no exception for building ResourceProfile
+        rp = rpb.require(treqs).build
+
+        # check taskResources, similar to executorResources.
+        self.assertEqual(rp.taskResources["cpus"].amount, 2.0)
+
+        # SparkContext is not initialized and is not remote.
+        with self.assertRaisesRegex(
+            RuntimeError, "SparkContext must be created to get the profile id."
+        ):
+            rp.id
+
+        # Remote mode.
+        spark = SparkSession.builder.remote("local-cluster[1, 2, 
1024]").getOrCreate()
+        # Still can access taskResources, similar to executorResources.
+        self.assertEqual(rp.taskResources["cpus"].amount, 2.0)
+        rp.id
+        df = spark.range(10)
+        df.mapInPandas(lambda x: x, df.schema, False, rp).collect()
+        df.mapInArrow(lambda x: x, df.schema, False, rp).collect()
 
         def assert_request_contents(exec_reqs, task_reqs):
             self.assertEqual(len(exec_reqs), 6)
@@ -41,38 +60,18 @@ class ResourceProfileTests(unittest.TestCase):
             self.assertEqual(task_reqs["cpus"].amount, 2.0)
             self.assertEqual(task_reqs["gpu"].amount, 2.0)
 
-        assert_request_contents(ereqs.requests, treqs.requests)
+        rpb = ResourceProfileBuilder()
+        ereqs = 
ExecutorResourceRequests().cores(2).memory("6g").memoryOverhead("1g")
+        ereqs.pysparkMemory("2g").offheapMemory("3g").resource("gpu", 2, 
"testGpus", "nvidia.com")
+        treqs = TaskResourceRequests().cpus(2).resource("gpu", 2)
         rp = rpb.require(ereqs).require(treqs).build
         assert_request_contents(rp.executorResources, rp.taskResources)
-        from pyspark import SparkContext, SparkConf
 
-        sc = SparkContext(conf=SparkConf())
-        rdd = sc.parallelize(range(10)).withResources(rp)
-        return_rp = rdd.getResourceProfile()
-        assert_request_contents(return_rp.executorResources, 
return_rp.taskResources)
-        # intermix objects created before SparkContext init and after
-        rpb2 = ResourceProfileBuilder()
-        # use reqs created before SparkContext with Builder after
-        rpb2.require(ereqs)
-        rpb2.require(treqs)
-        rp2 = rpb2.build
-        self.assertTrue(rp2.id > 0)
-        rdd2 = sc.parallelize(range(10)).withResources(rp2)
-        return_rp2 = rdd2.getResourceProfile()
-        assert_request_contents(return_rp2.executorResources, 
return_rp2.taskResources)
-        ereqs2 = 
ExecutorResourceRequests().cores(2).memory("6g").memoryOverhead("1g")
-        ereqs.pysparkMemory("2g").resource("gpu", 2, "testGpus", "nvidia.com")
-        treqs2 = TaskResourceRequests().cpus(2).resource("gpu", 2)
-        # use reqs created after SparkContext with Builder before
-        rpb.require(ereqs2)
-        rpb.require(treqs2)
-        rp3 = rpb.build
-        assert_request_contents(rp3.executorResources, rp3.taskResources)
-        sc.stop()
+        spark.stop()
 
 
 if __name__ == "__main__":
-    from pyspark.resource.tests.test_resources import *  # noqa: F401
+    from pyspark.resource.tests.test_connect_resources import *  # noqa: F401
 
     try:
         import xmlrunner
diff --git a/python/pyspark/resource/tests/test_resources.py 
b/python/pyspark/resource/tests/test_resources.py
index 81a4ea4f1d89..20af820520e5 100644
--- a/python/pyspark/resource/tests/test_resources.py
+++ b/python/pyspark/resource/tests/test_resources.py
@@ -17,6 +17,7 @@
 import unittest
 
 from pyspark.resource import ExecutorResourceRequests, ResourceProfileBuilder, 
TaskResourceRequests
+from pyspark.sql import SparkSession
 
 
 class ResourceProfileTests(unittest.TestCase):
@@ -70,6 +71,18 @@ class ResourceProfileTests(unittest.TestCase):
         assert_request_contents(rp3.executorResources, rp3.taskResources)
         sc.stop()
 
+    def test_profile_before_sc_for_sql(self):
+        rpb = ResourceProfileBuilder()
+        treqs = TaskResourceRequests().cpus(2)
+        # no exception for building ResourceProfile
+        rp = rpb.require(treqs).build
+
+        spark = SparkSession.builder.master("local-cluster[1, 2, 
1024]").getOrCreate()
+        df = spark.range(10)
+        df.mapInPandas(lambda x: x, df.schema, False, rp).collect()
+        df.mapInArrow(lambda x: x, df.schema, False, rp).collect()
+        spark.stop()
+
 
 if __name__ == "__main__":
     from pyspark.resource.tests.test_resources import *  # noqa: F401
diff --git a/python/pyspark/sql/connect/client/core.py 
b/python/pyspark/sql/connect/client/core.py
index 322a882d03e5..c8cf12f40708 100644
--- a/python/pyspark/sql/connect/client/core.py
+++ b/python/pyspark/sql/connect/client/core.py
@@ -1321,6 +1321,9 @@ class SparkConnectClient(object):
                         + f"{num_records_in_batch}."
                     )
                 num_records += num_records_in_batch
+            if b.HasField("create_resource_profile_command_result"):
+                profile_id = 
b.create_resource_profile_command_result.profile_id
+                yield {"create_resource_profile_command_result": profile_id}
 
         try:
             if self._use_reattachable_execute:
@@ -1746,3 +1749,12 @@ class SparkConnectClient(object):
         else:
             # Update the server side session ID.
             self._server_session_id = response.server_side_session_id
+
+    def _create_profile(self, profile: pb2.ResourceProfile) -> int:
+        """Create the ResourceProfile on the server side and return the 
profile ID"""
+        logger.info("Creating the ResourceProfile")
+        cmd = pb2.Command()
+        cmd.create_resource_profile_command.profile.CopyFrom(profile)
+        (_, properties) = self.execute_command(cmd)
+        profile_id = properties["create_resource_profile_command_result"]
+        return profile_id
diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index 806c8c0284e6..820a15429ecf 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -19,6 +19,7 @@ from pyspark.errors.exceptions.base import (
     PySparkIndexError,
     PySparkAttributeError,
 )
+from pyspark.resource import ResourceProfile
 from pyspark.sql.connect.utils import check_dependencies
 
 check_dependencies(__name__)
@@ -2059,6 +2060,7 @@ class DataFrame:
         schema: Union[StructType, str],
         evalType: int,
         barrier: bool,
+        profile: Optional[ResourceProfile],
     ) -> "DataFrame":
         from pyspark.sql.connect.udf import UserDefinedFunction
 
@@ -2070,7 +2072,11 @@ class DataFrame:
 
         return DataFrame(
             plan.MapPartitions(
-                child=self._plan, function=udf_obj, cols=self.columns, 
is_barrier=barrier
+                child=self._plan,
+                function=udf_obj,
+                cols=self.columns,
+                is_barrier=barrier,
+                profile=profile,
             ),
             session=self._session,
         )
@@ -2080,8 +2086,11 @@ class DataFrame:
         func: "PandasMapIterFunction",
         schema: Union[StructType, str],
         barrier: bool = False,
+        profile: Optional[ResourceProfile] = None,
     ) -> "DataFrame":
-        return self._map_partitions(func, schema, 
PythonEvalType.SQL_MAP_PANDAS_ITER_UDF, barrier)
+        return self._map_partitions(
+            func, schema, PythonEvalType.SQL_MAP_PANDAS_ITER_UDF, barrier, 
profile
+        )
 
     mapInPandas.__doc__ = PySparkDataFrame.mapInPandas.__doc__
 
@@ -2090,8 +2099,11 @@ class DataFrame:
         func: "ArrowMapIterFunction",
         schema: Union[StructType, str],
         barrier: bool = False,
+        profile: Optional[ResourceProfile] = None,
     ) -> "DataFrame":
-        return self._map_partitions(func, schema, 
PythonEvalType.SQL_MAP_ARROW_ITER_UDF, barrier)
+        return self._map_partitions(
+            func, schema, PythonEvalType.SQL_MAP_ARROW_ITER_UDF, barrier, 
profile
+        )
 
     mapInArrow.__doc__ = PySparkDataFrame.mapInArrow.__doc__
 
diff --git a/python/pyspark/sql/connect/plan.py 
b/python/pyspark/sql/connect/plan.py
index 863c27fabf6b..7751e42466aa 100644
--- a/python/pyspark/sql/connect/plan.py
+++ b/python/pyspark/sql/connect/plan.py
@@ -14,6 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+from pyspark.resource import ResourceProfile
 from pyspark.sql.connect.utils import check_dependencies
 
 check_dependencies(__name__)
@@ -2085,11 +2086,13 @@ class MapPartitions(LogicalPlan):
         function: "UserDefinedFunction",
         cols: List[str],
         is_barrier: bool,
+        profile: Optional[ResourceProfile],
     ) -> None:
         super().__init__(child)
 
         self._function = 
function._build_common_inline_user_defined_function(*cols)
         self._is_barrier = is_barrier
+        self._profile = profile
 
     def plan(self, session: "SparkConnectClient") -> proto.Relation:
         assert self._child is not None
@@ -2097,6 +2100,8 @@ class MapPartitions(LogicalPlan):
         plan.map_partitions.input.CopyFrom(self._child.plan(session))
         plan.map_partitions.func.CopyFrom(self._function.to_plan_udf(session))
         plan.map_partitions.is_barrier = self._is_barrier
+        if self._profile is not None:
+            plan.map_partitions.profile_id = self._profile.id
         return plan
 
 
diff --git a/python/pyspark/sql/connect/proto/base_pb2.py 
b/python/pyspark/sql/connect/proto/base_pb2.py
index af42f9b73628..2943057a99fc 100644
--- a/python/pyspark/sql/connect/proto/base_pb2.py
+++ b/python/pyspark/sql/connect/proto/base_pb2.py
@@ -37,7 +37,7 @@ from pyspark.sql.connect.proto import types_pb2 as 
spark_dot_connect_dot_types__
 
 
 DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-    
b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01
 
\x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02
 
\x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"z\n\x0bUserContext\x12\x17
 [...]
+    
b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01
 
\x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02
 
\x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"z\n\x0bUserContext\x12\x17
 [...]
 )
 
 _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
@@ -120,105 +120,105 @@ if _descriptor._USE_C_DESCRIPTORS == False:
     _EXECUTEPLANREQUEST_REQUESTOPTION._serialized_start = 5196
     _EXECUTEPLANREQUEST_REQUESTOPTION._serialized_end = 5361
     _EXECUTEPLANRESPONSE._serialized_start = 5440
-    _EXECUTEPLANRESPONSE._serialized_end = 7653
-    _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_start = 6789
-    _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_end = 6860
-    _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_start = 6862
-    _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_end = 6980
-    _EXECUTEPLANRESPONSE_METRICS._serialized_start = 6983
-    _EXECUTEPLANRESPONSE_METRICS._serialized_end = 7500
-    _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_start = 7078
-    _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_end = 7410
-    
_EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_start
 = 7287
-    
_EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_end 
= 7410
-    _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_start = 7412
-    _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_end = 7500
-    _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_start = 7502
-    _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_end = 7618
-    _EXECUTEPLANRESPONSE_RESULTCOMPLETE._serialized_start = 7620
-    _EXECUTEPLANRESPONSE_RESULTCOMPLETE._serialized_end = 7636
-    _KEYVALUE._serialized_start = 7655
-    _KEYVALUE._serialized_end = 7720
-    _CONFIGREQUEST._serialized_start = 7723
-    _CONFIGREQUEST._serialized_end = 8882
-    _CONFIGREQUEST_OPERATION._serialized_start = 8031
-    _CONFIGREQUEST_OPERATION._serialized_end = 8529
-    _CONFIGREQUEST_SET._serialized_start = 8531
-    _CONFIGREQUEST_SET._serialized_end = 8583
-    _CONFIGREQUEST_GET._serialized_start = 8585
-    _CONFIGREQUEST_GET._serialized_end = 8610
-    _CONFIGREQUEST_GETWITHDEFAULT._serialized_start = 8612
-    _CONFIGREQUEST_GETWITHDEFAULT._serialized_end = 8675
-    _CONFIGREQUEST_GETOPTION._serialized_start = 8677
-    _CONFIGREQUEST_GETOPTION._serialized_end = 8708
-    _CONFIGREQUEST_GETALL._serialized_start = 8710
-    _CONFIGREQUEST_GETALL._serialized_end = 8758
-    _CONFIGREQUEST_UNSET._serialized_start = 8760
-    _CONFIGREQUEST_UNSET._serialized_end = 8787
-    _CONFIGREQUEST_ISMODIFIABLE._serialized_start = 8789
-    _CONFIGREQUEST_ISMODIFIABLE._serialized_end = 8823
-    _CONFIGRESPONSE._serialized_start = 8885
-    _CONFIGRESPONSE._serialized_end = 9060
-    _ADDARTIFACTSREQUEST._serialized_start = 9063
-    _ADDARTIFACTSREQUEST._serialized_end = 10065
-    _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_start = 9538
-    _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_end = 9591
-    _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_start = 9593
-    _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_end = 9704
-    _ADDARTIFACTSREQUEST_BATCH._serialized_start = 9706
-    _ADDARTIFACTSREQUEST_BATCH._serialized_end = 9799
-    _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_start = 9802
-    _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_end = 9995
-    _ADDARTIFACTSRESPONSE._serialized_start = 10068
-    _ADDARTIFACTSRESPONSE._serialized_end = 10340
-    _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_start = 10259
-    _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_end = 10340
-    _ARTIFACTSTATUSESREQUEST._serialized_start = 10343
-    _ARTIFACTSTATUSESREQUEST._serialized_end = 10669
-    _ARTIFACTSTATUSESRESPONSE._serialized_start = 10672
-    _ARTIFACTSTATUSESRESPONSE._serialized_end = 11024
-    _ARTIFACTSTATUSESRESPONSE_STATUSESENTRY._serialized_start = 10867
-    _ARTIFACTSTATUSESRESPONSE_STATUSESENTRY._serialized_end = 10982
-    _ARTIFACTSTATUSESRESPONSE_ARTIFACTSTATUS._serialized_start = 10984
-    _ARTIFACTSTATUSESRESPONSE_ARTIFACTSTATUS._serialized_end = 11024
-    _INTERRUPTREQUEST._serialized_start = 11027
-    _INTERRUPTREQUEST._serialized_end = 11630
-    _INTERRUPTREQUEST_INTERRUPTTYPE._serialized_start = 11430
-    _INTERRUPTREQUEST_INTERRUPTTYPE._serialized_end = 11558
-    _INTERRUPTRESPONSE._serialized_start = 11633
-    _INTERRUPTRESPONSE._serialized_end = 11777
-    _REATTACHOPTIONS._serialized_start = 11779
-    _REATTACHOPTIONS._serialized_end = 11832
-    _REATTACHEXECUTEREQUEST._serialized_start = 11835
-    _REATTACHEXECUTEREQUEST._serialized_end = 12241
-    _RELEASEEXECUTEREQUEST._serialized_start = 12244
-    _RELEASEEXECUTEREQUEST._serialized_end = 12829
-    _RELEASEEXECUTEREQUEST_RELEASEALL._serialized_start = 12698
-    _RELEASEEXECUTEREQUEST_RELEASEALL._serialized_end = 12710
-    _RELEASEEXECUTEREQUEST_RELEASEUNTIL._serialized_start = 12712
-    _RELEASEEXECUTEREQUEST_RELEASEUNTIL._serialized_end = 12759
-    _RELEASEEXECUTERESPONSE._serialized_start = 12832
-    _RELEASEEXECUTERESPONSE._serialized_end = 12997
-    _RELEASESESSIONREQUEST._serialized_start = 13000
-    _RELEASESESSIONREQUEST._serialized_end = 13171
-    _RELEASESESSIONRESPONSE._serialized_start = 13173
-    _RELEASESESSIONRESPONSE._serialized_end = 13281
-    _FETCHERRORDETAILSREQUEST._serialized_start = 13284
-    _FETCHERRORDETAILSREQUEST._serialized_end = 13616
-    _FETCHERRORDETAILSRESPONSE._serialized_start = 13619
-    _FETCHERRORDETAILSRESPONSE._serialized_end = 15174
-    _FETCHERRORDETAILSRESPONSE_STACKTRACEELEMENT._serialized_start = 13848
-    _FETCHERRORDETAILSRESPONSE_STACKTRACEELEMENT._serialized_end = 14022
-    _FETCHERRORDETAILSRESPONSE_QUERYCONTEXT._serialized_start = 14025
-    _FETCHERRORDETAILSRESPONSE_QUERYCONTEXT._serialized_end = 14393
-    _FETCHERRORDETAILSRESPONSE_QUERYCONTEXT_CONTEXTTYPE._serialized_start = 
14356
-    _FETCHERRORDETAILSRESPONSE_QUERYCONTEXT_CONTEXTTYPE._serialized_end = 14393
-    _FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE._serialized_start = 14396
-    _FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE._serialized_end = 14805
-    
_FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE_MESSAGEPARAMETERSENTRY._serialized_start
 = 14707
-    
_FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE_MESSAGEPARAMETERSENTRY._serialized_end
 = 14775
-    _FETCHERRORDETAILSRESPONSE_ERROR._serialized_start = 14808
-    _FETCHERRORDETAILSRESPONSE_ERROR._serialized_end = 15155
-    _SPARKCONNECTSERVICE._serialized_start = 15177
-    _SPARKCONNECTSERVICE._serialized_end = 16123
+    _EXECUTEPLANRESPONSE._serialized_end = 7791
+    _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_start = 6927
+    _EXECUTEPLANRESPONSE_SQLCOMMANDRESULT._serialized_end = 6998
+    _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_start = 7000
+    _EXECUTEPLANRESPONSE_ARROWBATCH._serialized_end = 7118
+    _EXECUTEPLANRESPONSE_METRICS._serialized_start = 7121
+    _EXECUTEPLANRESPONSE_METRICS._serialized_end = 7638
+    _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_start = 7216
+    _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT._serialized_end = 7548
+    
_EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_start
 = 7425
+    
_EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_end 
= 7548
+    _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_start = 7550
+    _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_end = 7638
+    _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_start = 7640
+    _EXECUTEPLANRESPONSE_OBSERVEDMETRICS._serialized_end = 7756
+    _EXECUTEPLANRESPONSE_RESULTCOMPLETE._serialized_start = 7758
+    _EXECUTEPLANRESPONSE_RESULTCOMPLETE._serialized_end = 7774
+    _KEYVALUE._serialized_start = 7793
+    _KEYVALUE._serialized_end = 7858
+    _CONFIGREQUEST._serialized_start = 7861
+    _CONFIGREQUEST._serialized_end = 9020
+    _CONFIGREQUEST_OPERATION._serialized_start = 8169
+    _CONFIGREQUEST_OPERATION._serialized_end = 8667
+    _CONFIGREQUEST_SET._serialized_start = 8669
+    _CONFIGREQUEST_SET._serialized_end = 8721
+    _CONFIGREQUEST_GET._serialized_start = 8723
+    _CONFIGREQUEST_GET._serialized_end = 8748
+    _CONFIGREQUEST_GETWITHDEFAULT._serialized_start = 8750
+    _CONFIGREQUEST_GETWITHDEFAULT._serialized_end = 8813
+    _CONFIGREQUEST_GETOPTION._serialized_start = 8815
+    _CONFIGREQUEST_GETOPTION._serialized_end = 8846
+    _CONFIGREQUEST_GETALL._serialized_start = 8848
+    _CONFIGREQUEST_GETALL._serialized_end = 8896
+    _CONFIGREQUEST_UNSET._serialized_start = 8898
+    _CONFIGREQUEST_UNSET._serialized_end = 8925
+    _CONFIGREQUEST_ISMODIFIABLE._serialized_start = 8927
+    _CONFIGREQUEST_ISMODIFIABLE._serialized_end = 8961
+    _CONFIGRESPONSE._serialized_start = 9023
+    _CONFIGRESPONSE._serialized_end = 9198
+    _ADDARTIFACTSREQUEST._serialized_start = 9201
+    _ADDARTIFACTSREQUEST._serialized_end = 10203
+    _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_start = 9676
+    _ADDARTIFACTSREQUEST_ARTIFACTCHUNK._serialized_end = 9729
+    _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_start = 9731
+    _ADDARTIFACTSREQUEST_SINGLECHUNKARTIFACT._serialized_end = 9842
+    _ADDARTIFACTSREQUEST_BATCH._serialized_start = 9844
+    _ADDARTIFACTSREQUEST_BATCH._serialized_end = 9937
+    _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_start = 9940
+    _ADDARTIFACTSREQUEST_BEGINCHUNKEDARTIFACT._serialized_end = 10133
+    _ADDARTIFACTSRESPONSE._serialized_start = 10206
+    _ADDARTIFACTSRESPONSE._serialized_end = 10478
+    _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_start = 10397
+    _ADDARTIFACTSRESPONSE_ARTIFACTSUMMARY._serialized_end = 10478
+    _ARTIFACTSTATUSESREQUEST._serialized_start = 10481
+    _ARTIFACTSTATUSESREQUEST._serialized_end = 10807
+    _ARTIFACTSTATUSESRESPONSE._serialized_start = 10810
+    _ARTIFACTSTATUSESRESPONSE._serialized_end = 11162
+    _ARTIFACTSTATUSESRESPONSE_STATUSESENTRY._serialized_start = 11005
+    _ARTIFACTSTATUSESRESPONSE_STATUSESENTRY._serialized_end = 11120
+    _ARTIFACTSTATUSESRESPONSE_ARTIFACTSTATUS._serialized_start = 11122
+    _ARTIFACTSTATUSESRESPONSE_ARTIFACTSTATUS._serialized_end = 11162
+    _INTERRUPTREQUEST._serialized_start = 11165
+    _INTERRUPTREQUEST._serialized_end = 11768
+    _INTERRUPTREQUEST_INTERRUPTTYPE._serialized_start = 11568
+    _INTERRUPTREQUEST_INTERRUPTTYPE._serialized_end = 11696
+    _INTERRUPTRESPONSE._serialized_start = 11771
+    _INTERRUPTRESPONSE._serialized_end = 11915
+    _REATTACHOPTIONS._serialized_start = 11917
+    _REATTACHOPTIONS._serialized_end = 11970
+    _REATTACHEXECUTEREQUEST._serialized_start = 11973
+    _REATTACHEXECUTEREQUEST._serialized_end = 12379
+    _RELEASEEXECUTEREQUEST._serialized_start = 12382
+    _RELEASEEXECUTEREQUEST._serialized_end = 12967
+    _RELEASEEXECUTEREQUEST_RELEASEALL._serialized_start = 12836
+    _RELEASEEXECUTEREQUEST_RELEASEALL._serialized_end = 12848
+    _RELEASEEXECUTEREQUEST_RELEASEUNTIL._serialized_start = 12850
+    _RELEASEEXECUTEREQUEST_RELEASEUNTIL._serialized_end = 12897
+    _RELEASEEXECUTERESPONSE._serialized_start = 12970
+    _RELEASEEXECUTERESPONSE._serialized_end = 13135
+    _RELEASESESSIONREQUEST._serialized_start = 13138
+    _RELEASESESSIONREQUEST._serialized_end = 13309
+    _RELEASESESSIONRESPONSE._serialized_start = 13311
+    _RELEASESESSIONRESPONSE._serialized_end = 13419
+    _FETCHERRORDETAILSREQUEST._serialized_start = 13422
+    _FETCHERRORDETAILSREQUEST._serialized_end = 13754
+    _FETCHERRORDETAILSRESPONSE._serialized_start = 13757
+    _FETCHERRORDETAILSRESPONSE._serialized_end = 15312
+    _FETCHERRORDETAILSRESPONSE_STACKTRACEELEMENT._serialized_start = 13986
+    _FETCHERRORDETAILSRESPONSE_STACKTRACEELEMENT._serialized_end = 14160
+    _FETCHERRORDETAILSRESPONSE_QUERYCONTEXT._serialized_start = 14163
+    _FETCHERRORDETAILSRESPONSE_QUERYCONTEXT._serialized_end = 14531
+    _FETCHERRORDETAILSRESPONSE_QUERYCONTEXT_CONTEXTTYPE._serialized_start = 
14494
+    _FETCHERRORDETAILSRESPONSE_QUERYCONTEXT_CONTEXTTYPE._serialized_end = 14531
+    _FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE._serialized_start = 14534
+    _FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE._serialized_end = 14943
+    
_FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE_MESSAGEPARAMETERSENTRY._serialized_start
 = 14845
+    
_FETCHERRORDETAILSRESPONSE_SPARKTHROWABLE_MESSAGEPARAMETERSENTRY._serialized_end
 = 14913
+    _FETCHERRORDETAILSRESPONSE_ERROR._serialized_start = 14946
+    _FETCHERRORDETAILSRESPONSE_ERROR._serialized_end = 15293
+    _SPARKCONNECTSERVICE._serialized_start = 15315
+    _SPARKCONNECTSERVICE._serialized_end = 16261
 # @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/base_pb2.pyi 
b/python/pyspark/sql/connect/proto/base_pb2.pyi
index 86dec5711ece..562977331952 100644
--- a/python/pyspark/sql/connect/proto/base_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/base_pb2.pyi
@@ -1458,6 +1458,7 @@ class 
ExecutePlanResponse(google.protobuf.message.Message):
     STREAMING_QUERY_MANAGER_COMMAND_RESULT_FIELD_NUMBER: builtins.int
     STREAMING_QUERY_LISTENER_EVENTS_RESULT_FIELD_NUMBER: builtins.int
     RESULT_COMPLETE_FIELD_NUMBER: builtins.int
+    CREATE_RESOURCE_PROFILE_COMMAND_RESULT_FIELD_NUMBER: builtins.int
     EXTENSION_FIELD_NUMBER: builtins.int
     METRICS_FIELD_NUMBER: builtins.int
     OBSERVED_METRICS_FIELD_NUMBER: builtins.int
@@ -1511,6 +1512,11 @@ class 
ExecutePlanResponse(google.protobuf.message.Message):
     def result_complete(self) -> global___ExecutePlanResponse.ResultComplete:
         """Response type informing if the stream is complete in reattachable 
execution."""
     @property
+    def create_resource_profile_command_result(
+        self,
+    ) -> 
pyspark.sql.connect.proto.commands_pb2.CreateResourceProfileCommandResult:
+        """Response for command that creates ResourceProfile."""
+    @property
     def extension(self) -> google.protobuf.any_pb2.Any:
         """Support arbitrary result objects."""
     @property
@@ -1548,6 +1554,8 @@ class 
ExecutePlanResponse(google.protobuf.message.Message):
         streaming_query_listener_events_result: 
pyspark.sql.connect.proto.commands_pb2.StreamingQueryListenerEventsResult
         | None = ...,
         result_complete: global___ExecutePlanResponse.ResultComplete | None = 
...,
+        create_resource_profile_command_result: 
pyspark.sql.connect.proto.commands_pb2.CreateResourceProfileCommandResult
+        | None = ...,
         extension: google.protobuf.any_pb2.Any | None = ...,
         metrics: global___ExecutePlanResponse.Metrics | None = ...,
         observed_metrics: 
collections.abc.Iterable[global___ExecutePlanResponse.ObservedMetrics]
@@ -1559,6 +1567,8 @@ class 
ExecutePlanResponse(google.protobuf.message.Message):
         field_name: typing_extensions.Literal[
             "arrow_batch",
             b"arrow_batch",
+            "create_resource_profile_command_result",
+            b"create_resource_profile_command_result",
             "extension",
             b"extension",
             "get_resources_command_result",
@@ -1588,6 +1598,8 @@ class 
ExecutePlanResponse(google.protobuf.message.Message):
         field_name: typing_extensions.Literal[
             "arrow_batch",
             b"arrow_batch",
+            "create_resource_profile_command_result",
+            b"create_resource_profile_command_result",
             "extension",
             b"extension",
             "get_resources_command_result",
@@ -1634,6 +1646,7 @@ class 
ExecutePlanResponse(google.protobuf.message.Message):
             "streaming_query_manager_command_result",
             "streaming_query_listener_events_result",
             "result_complete",
+            "create_resource_profile_command_result",
             "extension",
         ]
         | None
diff --git a/python/pyspark/sql/connect/proto/commands_pb2.py 
b/python/pyspark/sql/connect/proto/commands_pb2.py
index 390ce3988a4b..eba96d28eb68 100644
--- a/python/pyspark/sql/connect/proto/commands_pb2.py
+++ b/python/pyspark/sql/connect/proto/commands_pb2.py
@@ -35,7 +35,7 @@ from pyspark.sql.connect.proto import relations_pb2 as 
spark_dot_connect_dot_rel
 
 
 DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-    
b'\n\x1cspark/connect/commands.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto"\xdf\t\n\x07\x43ommand\x12]\n\x11register_function\x18\x01
 
\x01(\x0b\x32..spark.connect.CommonInlineUserDefinedFunctionH\x00R\x10registerFunction\x12H\n\x0fwrite_operation\x18\x02
 
\x01(\x0b\x32\x1d.spark.connect.WriteOperationH\x00R\x0ewriteOperation\x12_\n\x15\x63reate_dataframe_view\x18
 [...]
+    
b'\n\x1cspark/connect/commands.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto"\xd5\n\n\x07\x43ommand\x12]\n\x11register_function\x18\x01
 
\x01(\x0b\x32..spark.connect.CommonInlineUserDefinedFunctionH\x00R\x10registerFunction\x12H\n\x0fwrite_operation\x18\x02
 
\x01(\x0b\x32\x1d.spark.connect.WriteOperationH\x00R\x0ewriteOperation\x12_\n\x15\x63reate_dataframe_view\x18
 [...]
 )
 
 _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
@@ -63,94 +63,98 @@ if _descriptor._USE_C_DESCRIPTORS == False:
     _WRITESTREAMOPERATIONSTART_OPTIONSENTRY._serialized_options = b"8\001"
     _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._options = None
     _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._serialized_options = b"8\001"
-    _STREAMINGQUERYEVENTTYPE._serialized_start = 9744
-    _STREAMINGQUERYEVENTTYPE._serialized_end = 9877
+    _STREAMINGQUERYEVENTTYPE._serialized_start = 10021
+    _STREAMINGQUERYEVENTTYPE._serialized_end = 10154
     _COMMAND._serialized_start = 167
-    _COMMAND._serialized_end = 1414
-    _SQLCOMMAND._serialized_start = 1417
-    _SQLCOMMAND._serialized_end = 1912
-    _SQLCOMMAND_ARGSENTRY._serialized_start = 1728
-    _SQLCOMMAND_ARGSENTRY._serialized_end = 1818
-    _SQLCOMMAND_NAMEDARGUMENTSENTRY._serialized_start = 1820
-    _SQLCOMMAND_NAMEDARGUMENTSENTRY._serialized_end = 1912
-    _CREATEDATAFRAMEVIEWCOMMAND._serialized_start = 1915
-    _CREATEDATAFRAMEVIEWCOMMAND._serialized_end = 2065
-    _WRITEOPERATION._serialized_start = 2068
-    _WRITEOPERATION._serialized_end = 3166
-    _WRITEOPERATION_OPTIONSENTRY._serialized_start = 2590
-    _WRITEOPERATION_OPTIONSENTRY._serialized_end = 2648
-    _WRITEOPERATION_SAVETABLE._serialized_start = 2651
-    _WRITEOPERATION_SAVETABLE._serialized_end = 2909
-    _WRITEOPERATION_SAVETABLE_TABLESAVEMETHOD._serialized_start = 2785
-    _WRITEOPERATION_SAVETABLE_TABLESAVEMETHOD._serialized_end = 2909
-    _WRITEOPERATION_BUCKETBY._serialized_start = 2911
-    _WRITEOPERATION_BUCKETBY._serialized_end = 3002
-    _WRITEOPERATION_SAVEMODE._serialized_start = 3005
-    _WRITEOPERATION_SAVEMODE._serialized_end = 3142
-    _WRITEOPERATIONV2._serialized_start = 3169
-    _WRITEOPERATIONV2._serialized_end = 4029
-    _WRITEOPERATIONV2_OPTIONSENTRY._serialized_start = 2590
-    _WRITEOPERATIONV2_OPTIONSENTRY._serialized_end = 2648
-    _WRITEOPERATIONV2_TABLEPROPERTIESENTRY._serialized_start = 3788
-    _WRITEOPERATIONV2_TABLEPROPERTIESENTRY._serialized_end = 3854
-    _WRITEOPERATIONV2_MODE._serialized_start = 3857
-    _WRITEOPERATIONV2_MODE._serialized_end = 4016
-    _WRITESTREAMOPERATIONSTART._serialized_start = 4032
-    _WRITESTREAMOPERATIONSTART._serialized_end = 4832
-    _WRITESTREAMOPERATIONSTART_OPTIONSENTRY._serialized_start = 2590
-    _WRITESTREAMOPERATIONSTART_OPTIONSENTRY._serialized_end = 2648
-    _STREAMINGFOREACHFUNCTION._serialized_start = 4835
-    _STREAMINGFOREACHFUNCTION._serialized_end = 5014
-    _WRITESTREAMOPERATIONSTARTRESULT._serialized_start = 5017
-    _WRITESTREAMOPERATIONSTARTRESULT._serialized_end = 5229
-    _STREAMINGQUERYINSTANCEID._serialized_start = 5231
-    _STREAMINGQUERYINSTANCEID._serialized_end = 5296
-    _STREAMINGQUERYCOMMAND._serialized_start = 5299
-    _STREAMINGQUERYCOMMAND._serialized_end = 5931
-    _STREAMINGQUERYCOMMAND_EXPLAINCOMMAND._serialized_start = 5798
-    _STREAMINGQUERYCOMMAND_EXPLAINCOMMAND._serialized_end = 5842
-    _STREAMINGQUERYCOMMAND_AWAITTERMINATIONCOMMAND._serialized_start = 5844
-    _STREAMINGQUERYCOMMAND_AWAITTERMINATIONCOMMAND._serialized_end = 5920
-    _STREAMINGQUERYCOMMANDRESULT._serialized_start = 5934
-    _STREAMINGQUERYCOMMANDRESULT._serialized_end = 7075
-    _STREAMINGQUERYCOMMANDRESULT_STATUSRESULT._serialized_start = 6517
-    _STREAMINGQUERYCOMMANDRESULT_STATUSRESULT._serialized_end = 6687
-    _STREAMINGQUERYCOMMANDRESULT_RECENTPROGRESSRESULT._serialized_start = 6689
-    _STREAMINGQUERYCOMMANDRESULT_RECENTPROGRESSRESULT._serialized_end = 6761
-    _STREAMINGQUERYCOMMANDRESULT_EXPLAINRESULT._serialized_start = 6763
-    _STREAMINGQUERYCOMMANDRESULT_EXPLAINRESULT._serialized_end = 6802
-    _STREAMINGQUERYCOMMANDRESULT_EXCEPTIONRESULT._serialized_start = 6805
-    _STREAMINGQUERYCOMMANDRESULT_EXCEPTIONRESULT._serialized_end = 7002
-    _STREAMINGQUERYCOMMANDRESULT_AWAITTERMINATIONRESULT._serialized_start = 
7004
-    _STREAMINGQUERYCOMMANDRESULT_AWAITTERMINATIONRESULT._serialized_end = 7060
-    _STREAMINGQUERYMANAGERCOMMAND._serialized_start = 7078
-    _STREAMINGQUERYMANAGERCOMMAND._serialized_end = 7907
-    _STREAMINGQUERYMANAGERCOMMAND_AWAITANYTERMINATIONCOMMAND._serialized_start 
= 7609
-    _STREAMINGQUERYMANAGERCOMMAND_AWAITANYTERMINATIONCOMMAND._serialized_end = 
7688
-    
_STREAMINGQUERYMANAGERCOMMAND_STREAMINGQUERYLISTENERCOMMAND._serialized_start = 
7691
-    
_STREAMINGQUERYMANAGERCOMMAND_STREAMINGQUERYLISTENERCOMMAND._serialized_end = 
7896
-    _STREAMINGQUERYMANAGERCOMMANDRESULT._serialized_start = 7910
-    _STREAMINGQUERYMANAGERCOMMANDRESULT._serialized_end = 8986
-    _STREAMINGQUERYMANAGERCOMMANDRESULT_ACTIVERESULT._serialized_start = 8518
-    _STREAMINGQUERYMANAGERCOMMANDRESULT_ACTIVERESULT._serialized_end = 8645
-    
_STREAMINGQUERYMANAGERCOMMANDRESULT_STREAMINGQUERYINSTANCE._serialized_start = 
8647
-    _STREAMINGQUERYMANAGERCOMMANDRESULT_STREAMINGQUERYINSTANCE._serialized_end 
= 8762
-    
_STREAMINGQUERYMANAGERCOMMANDRESULT_AWAITANYTERMINATIONRESULT._serialized_start 
= 8764
-    
_STREAMINGQUERYMANAGERCOMMANDRESULT_AWAITANYTERMINATIONRESULT._serialized_end = 
8823
-    
_STREAMINGQUERYMANAGERCOMMANDRESULT_STREAMINGQUERYLISTENERINSTANCE._serialized_start
 = 8825
-    
_STREAMINGQUERYMANAGERCOMMANDRESULT_STREAMINGQUERYLISTENERINSTANCE._serialized_end
 = 8900
-    
_STREAMINGQUERYMANAGERCOMMANDRESULT_LISTSTREAMINGQUERYLISTENERRESULT._serialized_start
 = 8902
-    
_STREAMINGQUERYMANAGERCOMMANDRESULT_LISTSTREAMINGQUERYLISTENERRESULT._serialized_end
 = 8971
-    _STREAMINGQUERYLISTENERBUSCOMMAND._serialized_start = 8989
-    _STREAMINGQUERYLISTENERBUSCOMMAND._serialized_end = 9162
-    _STREAMINGQUERYLISTENEREVENT._serialized_start = 9165
-    _STREAMINGQUERYLISTENEREVENT._serialized_end = 9296
-    _STREAMINGQUERYLISTENEREVENTSRESULT._serialized_start = 9299
-    _STREAMINGQUERYLISTENEREVENTSRESULT._serialized_end = 9503
-    _GETRESOURCESCOMMAND._serialized_start = 9505
-    _GETRESOURCESCOMMAND._serialized_end = 9526
-    _GETRESOURCESCOMMANDRESULT._serialized_start = 9529
-    _GETRESOURCESCOMMANDRESULT._serialized_end = 9741
-    _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._serialized_start = 9645
-    _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._serialized_end = 9741
+    _COMMAND._serialized_end = 1532
+    _SQLCOMMAND._serialized_start = 1535
+    _SQLCOMMAND._serialized_end = 2030
+    _SQLCOMMAND_ARGSENTRY._serialized_start = 1846
+    _SQLCOMMAND_ARGSENTRY._serialized_end = 1936
+    _SQLCOMMAND_NAMEDARGUMENTSENTRY._serialized_start = 1938
+    _SQLCOMMAND_NAMEDARGUMENTSENTRY._serialized_end = 2030
+    _CREATEDATAFRAMEVIEWCOMMAND._serialized_start = 2033
+    _CREATEDATAFRAMEVIEWCOMMAND._serialized_end = 2183
+    _WRITEOPERATION._serialized_start = 2186
+    _WRITEOPERATION._serialized_end = 3284
+    _WRITEOPERATION_OPTIONSENTRY._serialized_start = 2708
+    _WRITEOPERATION_OPTIONSENTRY._serialized_end = 2766
+    _WRITEOPERATION_SAVETABLE._serialized_start = 2769
+    _WRITEOPERATION_SAVETABLE._serialized_end = 3027
+    _WRITEOPERATION_SAVETABLE_TABLESAVEMETHOD._serialized_start = 2903
+    _WRITEOPERATION_SAVETABLE_TABLESAVEMETHOD._serialized_end = 3027
+    _WRITEOPERATION_BUCKETBY._serialized_start = 3029
+    _WRITEOPERATION_BUCKETBY._serialized_end = 3120
+    _WRITEOPERATION_SAVEMODE._serialized_start = 3123
+    _WRITEOPERATION_SAVEMODE._serialized_end = 3260
+    _WRITEOPERATIONV2._serialized_start = 3287
+    _WRITEOPERATIONV2._serialized_end = 4147
+    _WRITEOPERATIONV2_OPTIONSENTRY._serialized_start = 2708
+    _WRITEOPERATIONV2_OPTIONSENTRY._serialized_end = 2766
+    _WRITEOPERATIONV2_TABLEPROPERTIESENTRY._serialized_start = 3906
+    _WRITEOPERATIONV2_TABLEPROPERTIESENTRY._serialized_end = 3972
+    _WRITEOPERATIONV2_MODE._serialized_start = 3975
+    _WRITEOPERATIONV2_MODE._serialized_end = 4134
+    _WRITESTREAMOPERATIONSTART._serialized_start = 4150
+    _WRITESTREAMOPERATIONSTART._serialized_end = 4950
+    _WRITESTREAMOPERATIONSTART_OPTIONSENTRY._serialized_start = 2708
+    _WRITESTREAMOPERATIONSTART_OPTIONSENTRY._serialized_end = 2766
+    _STREAMINGFOREACHFUNCTION._serialized_start = 4953
+    _STREAMINGFOREACHFUNCTION._serialized_end = 5132
+    _WRITESTREAMOPERATIONSTARTRESULT._serialized_start = 5135
+    _WRITESTREAMOPERATIONSTARTRESULT._serialized_end = 5347
+    _STREAMINGQUERYINSTANCEID._serialized_start = 5349
+    _STREAMINGQUERYINSTANCEID._serialized_end = 5414
+    _STREAMINGQUERYCOMMAND._serialized_start = 5417
+    _STREAMINGQUERYCOMMAND._serialized_end = 6049
+    _STREAMINGQUERYCOMMAND_EXPLAINCOMMAND._serialized_start = 5916
+    _STREAMINGQUERYCOMMAND_EXPLAINCOMMAND._serialized_end = 5960
+    _STREAMINGQUERYCOMMAND_AWAITTERMINATIONCOMMAND._serialized_start = 5962
+    _STREAMINGQUERYCOMMAND_AWAITTERMINATIONCOMMAND._serialized_end = 6038
+    _STREAMINGQUERYCOMMANDRESULT._serialized_start = 6052
+    _STREAMINGQUERYCOMMANDRESULT._serialized_end = 7193
+    _STREAMINGQUERYCOMMANDRESULT_STATUSRESULT._serialized_start = 6635
+    _STREAMINGQUERYCOMMANDRESULT_STATUSRESULT._serialized_end = 6805
+    _STREAMINGQUERYCOMMANDRESULT_RECENTPROGRESSRESULT._serialized_start = 6807
+    _STREAMINGQUERYCOMMANDRESULT_RECENTPROGRESSRESULT._serialized_end = 6879
+    _STREAMINGQUERYCOMMANDRESULT_EXPLAINRESULT._serialized_start = 6881
+    _STREAMINGQUERYCOMMANDRESULT_EXPLAINRESULT._serialized_end = 6920
+    _STREAMINGQUERYCOMMANDRESULT_EXCEPTIONRESULT._serialized_start = 6923
+    _STREAMINGQUERYCOMMANDRESULT_EXCEPTIONRESULT._serialized_end = 7120
+    _STREAMINGQUERYCOMMANDRESULT_AWAITTERMINATIONRESULT._serialized_start = 
7122
+    _STREAMINGQUERYCOMMANDRESULT_AWAITTERMINATIONRESULT._serialized_end = 7178
+    _STREAMINGQUERYMANAGERCOMMAND._serialized_start = 7196
+    _STREAMINGQUERYMANAGERCOMMAND._serialized_end = 8025
+    _STREAMINGQUERYMANAGERCOMMAND_AWAITANYTERMINATIONCOMMAND._serialized_start 
= 7727
+    _STREAMINGQUERYMANAGERCOMMAND_AWAITANYTERMINATIONCOMMAND._serialized_end = 
7806
+    
_STREAMINGQUERYMANAGERCOMMAND_STREAMINGQUERYLISTENERCOMMAND._serialized_start = 
7809
+    
_STREAMINGQUERYMANAGERCOMMAND_STREAMINGQUERYLISTENERCOMMAND._serialized_end = 
8014
+    _STREAMINGQUERYMANAGERCOMMANDRESULT._serialized_start = 8028
+    _STREAMINGQUERYMANAGERCOMMANDRESULT._serialized_end = 9104
+    _STREAMINGQUERYMANAGERCOMMANDRESULT_ACTIVERESULT._serialized_start = 8636
+    _STREAMINGQUERYMANAGERCOMMANDRESULT_ACTIVERESULT._serialized_end = 8763
+    
_STREAMINGQUERYMANAGERCOMMANDRESULT_STREAMINGQUERYINSTANCE._serialized_start = 
8765
+    _STREAMINGQUERYMANAGERCOMMANDRESULT_STREAMINGQUERYINSTANCE._serialized_end 
= 8880
+    
_STREAMINGQUERYMANAGERCOMMANDRESULT_AWAITANYTERMINATIONRESULT._serialized_start 
= 8882
+    
_STREAMINGQUERYMANAGERCOMMANDRESULT_AWAITANYTERMINATIONRESULT._serialized_end = 
8941
+    
_STREAMINGQUERYMANAGERCOMMANDRESULT_STREAMINGQUERYLISTENERINSTANCE._serialized_start
 = 8943
+    
_STREAMINGQUERYMANAGERCOMMANDRESULT_STREAMINGQUERYLISTENERINSTANCE._serialized_end
 = 9018
+    
_STREAMINGQUERYMANAGERCOMMANDRESULT_LISTSTREAMINGQUERYLISTENERRESULT._serialized_start
 = 9020
+    
_STREAMINGQUERYMANAGERCOMMANDRESULT_LISTSTREAMINGQUERYLISTENERRESULT._serialized_end
 = 9089
+    _STREAMINGQUERYLISTENERBUSCOMMAND._serialized_start = 9107
+    _STREAMINGQUERYLISTENERBUSCOMMAND._serialized_end = 9280
+    _STREAMINGQUERYLISTENEREVENT._serialized_start = 9283
+    _STREAMINGQUERYLISTENEREVENT._serialized_end = 9414
+    _STREAMINGQUERYLISTENEREVENTSRESULT._serialized_start = 9417
+    _STREAMINGQUERYLISTENEREVENTSRESULT._serialized_end = 9621
+    _GETRESOURCESCOMMAND._serialized_start = 9623
+    _GETRESOURCESCOMMAND._serialized_end = 9644
+    _GETRESOURCESCOMMANDRESULT._serialized_start = 9647
+    _GETRESOURCESCOMMANDRESULT._serialized_end = 9859
+    _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._serialized_start = 9763
+    _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._serialized_end = 9859
+    _CREATERESOURCEPROFILECOMMAND._serialized_start = 9861
+    _CREATERESOURCEPROFILECOMMAND._serialized_end = 9949
+    _CREATERESOURCEPROFILECOMMANDRESULT._serialized_start = 9951
+    _CREATERESOURCEPROFILECOMMANDRESULT._serialized_end = 10018
 # @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/commands_pb2.pyi 
b/python/pyspark/sql/connect/proto/commands_pb2.pyi
index b4eac1078441..b57a2a6c4d68 100644
--- a/python/pyspark/sql/connect/proto/commands_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/commands_pb2.pyi
@@ -100,6 +100,7 @@ class Command(google.protobuf.message.Message):
     REGISTER_TABLE_FUNCTION_FIELD_NUMBER: builtins.int
     STREAMING_QUERY_LISTENER_BUS_COMMAND_FIELD_NUMBER: builtins.int
     REGISTER_DATA_SOURCE_FIELD_NUMBER: builtins.int
+    CREATE_RESOURCE_PROFILE_COMMAND_FIELD_NUMBER: builtins.int
     EXTENSION_FIELD_NUMBER: builtins.int
     @property
     def register_function(
@@ -132,6 +133,8 @@ class Command(google.protobuf.message.Message):
         self,
     ) -> 
pyspark.sql.connect.proto.relations_pb2.CommonInlineUserDefinedDataSource: ...
     @property
+    def create_resource_profile_command(self) -> 
global___CreateResourceProfileCommand: ...
+    @property
     def extension(self) -> google.protobuf.any_pb2.Any:
         """This field is used to mark extensions to the protocol. When plugins 
generate arbitrary
         Commands they can add them here. During the planning the correct 
resolution is done.
@@ -155,6 +158,7 @@ class Command(google.protobuf.message.Message):
         | None = ...,
         register_data_source: 
pyspark.sql.connect.proto.relations_pb2.CommonInlineUserDefinedDataSource
         | None = ...,
+        create_resource_profile_command: global___CreateResourceProfileCommand 
| None = ...,
         extension: google.protobuf.any_pb2.Any | None = ...,
     ) -> None: ...
     def HasField(
@@ -164,6 +168,8 @@ class Command(google.protobuf.message.Message):
             b"command_type",
             "create_dataframe_view",
             b"create_dataframe_view",
+            "create_resource_profile_command",
+            b"create_resource_profile_command",
             "extension",
             b"extension",
             "get_resources_command",
@@ -197,6 +203,8 @@ class Command(google.protobuf.message.Message):
             b"command_type",
             "create_dataframe_view",
             b"create_dataframe_view",
+            "create_resource_profile_command",
+            b"create_resource_profile_command",
             "extension",
             b"extension",
             "get_resources_command",
@@ -239,6 +247,7 @@ class Command(google.protobuf.message.Message):
             "register_table_function",
             "streaming_query_listener_bus_command",
             "register_data_source",
+            "create_resource_profile_command",
             "extension",
         ]
         | None
@@ -2060,3 +2069,43 @@ class 
GetResourcesCommandResult(google.protobuf.message.Message):
     ) -> None: ...
 
 global___GetResourcesCommandResult = GetResourcesCommandResult
+
+class CreateResourceProfileCommand(google.protobuf.message.Message):
+    """Command to create ResourceProfile"""
+
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    PROFILE_FIELD_NUMBER: builtins.int
+    @property
+    def profile(self) -> pyspark.sql.connect.proto.common_pb2.ResourceProfile:
+        """(Required) The ResourceProfile to be built on the server-side."""
+    def __init__(
+        self,
+        *,
+        profile: pyspark.sql.connect.proto.common_pb2.ResourceProfile | None = 
...,
+    ) -> None: ...
+    def HasField(
+        self, field_name: typing_extensions.Literal["profile", b"profile"]
+    ) -> builtins.bool: ...
+    def ClearField(self, field_name: typing_extensions.Literal["profile", 
b"profile"]) -> None: ...
+
+global___CreateResourceProfileCommand = CreateResourceProfileCommand
+
+class CreateResourceProfileCommandResult(google.protobuf.message.Message):
+    """Response for command 'CreateResourceProfileCommand'."""
+
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    PROFILE_ID_FIELD_NUMBER: builtins.int
+    profile_id: builtins.int
+    """(Required) Server-side generated resource profile id."""
+    def __init__(
+        self,
+        *,
+        profile_id: builtins.int = ...,
+    ) -> None: ...
+    def ClearField(
+        self, field_name: typing_extensions.Literal["profile_id", 
b"profile_id"]
+    ) -> None: ...
+
+global___CreateResourceProfileCommandResult = 
CreateResourceProfileCommandResult
diff --git a/python/pyspark/sql/connect/proto/common_pb2.py 
b/python/pyspark/sql/connect/proto/common_pb2.py
index 7851d410de50..17ac6933a00e 100644
--- a/python/pyspark/sql/connect/proto/common_pb2.py
+++ b/python/pyspark/sql/connect/proto/common_pb2.py
@@ -29,7 +29,7 @@ _sym_db = _symbol_database.Default()
 
 
 DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-    
b'\n\x1aspark/connect/common.proto\x12\rspark.connect"\xb0\x01\n\x0cStorageLevel\x12\x19\n\x08use_disk\x18\x01
 \x01(\x08R\x07useDisk\x12\x1d\n\nuse_memory\x18\x02 \x01(\x08R\tuseMemory\x12 
\n\x0cuse_off_heap\x18\x03 
\x01(\x08R\nuseOffHeap\x12"\n\x0c\x64\x65serialized\x18\x04 
\x01(\x08R\x0c\x64\x65serialized\x12 \n\x0breplication\x18\x05 
\x01(\x05R\x0breplication"G\n\x13ResourceInformation\x12\x12\n\x04name\x18\x01 
\x01(\tR\x04name\x12\x1c\n\taddresses\x18\x02 \x03(\tR\taddressesB6\n\ [...]
+    
b'\n\x1aspark/connect/common.proto\x12\rspark.connect"\xb0\x01\n\x0cStorageLevel\x12\x19\n\x08use_disk\x18\x01
 \x01(\x08R\x07useDisk\x12\x1d\n\nuse_memory\x18\x02 \x01(\x08R\tuseMemory\x12 
\n\x0cuse_off_heap\x18\x03 
\x01(\x08R\nuseOffHeap\x12"\n\x0c\x64\x65serialized\x18\x04 
\x01(\x08R\x0c\x64\x65serialized\x12 \n\x0breplication\x18\x05 
\x01(\x05R\x0breplication"G\n\x13ResourceInformation\x12\x12\n\x04name\x18\x01 
\x01(\tR\x04name\x12\x1c\n\taddresses\x18\x02 \x03(\tR\taddresses"\xc3 [...]
 )
 
 _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
@@ -39,8 +39,22 @@ if _descriptor._USE_C_DESCRIPTORS == False:
     DESCRIPTOR._serialized_options = (
         b"\n\036org.apache.spark.connect.protoP\001Z\022internal/generated"
     )
+    _RESOURCEPROFILE_EXECUTORRESOURCESENTRY._options = None
+    _RESOURCEPROFILE_EXECUTORRESOURCESENTRY._serialized_options = b"8\001"
+    _RESOURCEPROFILE_TASKRESOURCESENTRY._options = None
+    _RESOURCEPROFILE_TASKRESOURCESENTRY._serialized_options = b"8\001"
     _STORAGELEVEL._serialized_start = 46
     _STORAGELEVEL._serialized_end = 222
     _RESOURCEINFORMATION._serialized_start = 224
     _RESOURCEINFORMATION._serialized_end = 295
+    _EXECUTORRESOURCEREQUEST._serialized_start = 298
+    _EXECUTORRESOURCEREQUEST._serialized_end = 493
+    _TASKRESOURCEREQUEST._serialized_start = 495
+    _TASKRESOURCEREQUEST._serialized_end = 577
+    _RESOURCEPROFILE._serialized_start = 580
+    _RESOURCEPROFILE._serialized_end = 1001
+    _RESOURCEPROFILE_EXECUTORRESOURCESENTRY._serialized_start = 791
+    _RESOURCEPROFILE_EXECUTORRESOURCESENTRY._serialized_end = 899
+    _RESOURCEPROFILE_TASKRESOURCESENTRY._serialized_start = 901
+    _RESOURCEPROFILE_TASKRESOURCESENTRY._serialized_end = 1001
 # @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/common_pb2.pyi 
b/python/pyspark/sql/connect/proto/common_pb2.pyi
index bb7bdeddbfd6..163781b41998 100644
--- a/python/pyspark/sql/connect/proto/common_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/common_pb2.pyi
@@ -39,6 +39,7 @@ import google.protobuf.descriptor
 import google.protobuf.internal.containers
 import google.protobuf.message
 import sys
+import typing
 
 if sys.version_info >= (3, 8):
     import typing as typing_extensions
@@ -121,3 +122,177 @@ class 
ResourceInformation(google.protobuf.message.Message):
     ) -> None: ...
 
 global___ResourceInformation = ResourceInformation
+
+class ExecutorResourceRequest(google.protobuf.message.Message):
+    """An executor resource request."""
+
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    RESOURCE_NAME_FIELD_NUMBER: builtins.int
+    AMOUNT_FIELD_NUMBER: builtins.int
+    DISCOVERY_SCRIPT_FIELD_NUMBER: builtins.int
+    VENDOR_FIELD_NUMBER: builtins.int
+    resource_name: builtins.str
+    """(Required) resource name."""
+    amount: builtins.int
+    """(Required) resource amount requesting."""
+    discovery_script: builtins.str
+    """Optional script used to discover the resources."""
+    vendor: builtins.str
+    """Optional vendor, required for some cluster managers."""
+    def __init__(
+        self,
+        *,
+        resource_name: builtins.str = ...,
+        amount: builtins.int = ...,
+        discovery_script: builtins.str | None = ...,
+        vendor: builtins.str | None = ...,
+    ) -> None: ...
+    def HasField(
+        self,
+        field_name: typing_extensions.Literal[
+            "_discovery_script",
+            b"_discovery_script",
+            "_vendor",
+            b"_vendor",
+            "discovery_script",
+            b"discovery_script",
+            "vendor",
+            b"vendor",
+        ],
+    ) -> builtins.bool: ...
+    def ClearField(
+        self,
+        field_name: typing_extensions.Literal[
+            "_discovery_script",
+            b"_discovery_script",
+            "_vendor",
+            b"_vendor",
+            "amount",
+            b"amount",
+            "discovery_script",
+            b"discovery_script",
+            "resource_name",
+            b"resource_name",
+            "vendor",
+            b"vendor",
+        ],
+    ) -> None: ...
+    @typing.overload
+    def WhichOneof(
+        self, oneof_group: typing_extensions.Literal["_discovery_script", 
b"_discovery_script"]
+    ) -> typing_extensions.Literal["discovery_script"] | None: ...
+    @typing.overload
+    def WhichOneof(
+        self, oneof_group: typing_extensions.Literal["_vendor", b"_vendor"]
+    ) -> typing_extensions.Literal["vendor"] | None: ...
+
+global___ExecutorResourceRequest = ExecutorResourceRequest
+
+class TaskResourceRequest(google.protobuf.message.Message):
+    """A task resource request."""
+
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    RESOURCE_NAME_FIELD_NUMBER: builtins.int
+    AMOUNT_FIELD_NUMBER: builtins.int
+    resource_name: builtins.str
+    """(Required) resource name."""
+    amount: builtins.float
+    """(Required) resource amount requesting as a double to support fractional
+    resource requests.
+    """
+    def __init__(
+        self,
+        *,
+        resource_name: builtins.str = ...,
+        amount: builtins.float = ...,
+    ) -> None: ...
+    def ClearField(
+        self,
+        field_name: typing_extensions.Literal[
+            "amount", b"amount", "resource_name", b"resource_name"
+        ],
+    ) -> None: ...
+
+global___TaskResourceRequest = TaskResourceRequest
+
+class ResourceProfile(google.protobuf.message.Message):
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    class ExecutorResourcesEntry(google.protobuf.message.Message):
+        DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+        KEY_FIELD_NUMBER: builtins.int
+        VALUE_FIELD_NUMBER: builtins.int
+        key: builtins.str
+        @property
+        def value(self) -> global___ExecutorResourceRequest: ...
+        def __init__(
+            self,
+            *,
+            key: builtins.str = ...,
+            value: global___ExecutorResourceRequest | None = ...,
+        ) -> None: ...
+        def HasField(
+            self, field_name: typing_extensions.Literal["value", b"value"]
+        ) -> builtins.bool: ...
+        def ClearField(
+            self, field_name: typing_extensions.Literal["key", b"key", 
"value", b"value"]
+        ) -> None: ...
+
+    class TaskResourcesEntry(google.protobuf.message.Message):
+        DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+        KEY_FIELD_NUMBER: builtins.int
+        VALUE_FIELD_NUMBER: builtins.int
+        key: builtins.str
+        @property
+        def value(self) -> global___TaskResourceRequest: ...
+        def __init__(
+            self,
+            *,
+            key: builtins.str = ...,
+            value: global___TaskResourceRequest | None = ...,
+        ) -> None: ...
+        def HasField(
+            self, field_name: typing_extensions.Literal["value", b"value"]
+        ) -> builtins.bool: ...
+        def ClearField(
+            self, field_name: typing_extensions.Literal["key", b"key", 
"value", b"value"]
+        ) -> None: ...
+
+    EXECUTOR_RESOURCES_FIELD_NUMBER: builtins.int
+    TASK_RESOURCES_FIELD_NUMBER: builtins.int
+    @property
+    def executor_resources(
+        self,
+    ) -> google.protobuf.internal.containers.MessageMap[
+        builtins.str, global___ExecutorResourceRequest
+    ]:
+        """(Optional) Resource requests for executors. Mapped from the 
resource name
+        (e.g., cores, memory, CPU) to its specific request.
+        """
+    @property
+    def task_resources(
+        self,
+    ) -> google.protobuf.internal.containers.MessageMap[builtins.str, 
global___TaskResourceRequest]:
+        """(Optional) Resource requests for tasks. Mapped from the resource 
name
+        (e.g., cores, memory, CPU) to its specific request.
+        """
+    def __init__(
+        self,
+        *,
+        executor_resources: collections.abc.Mapping[builtins.str, 
global___ExecutorResourceRequest]
+        | None = ...,
+        task_resources: collections.abc.Mapping[builtins.str, 
global___TaskResourceRequest]
+        | None = ...,
+    ) -> None: ...
+    def ClearField(
+        self,
+        field_name: typing_extensions.Literal[
+            "executor_resources", b"executor_resources", "task_resources", 
b"task_resources"
+        ],
+    ) -> None: ...
+
+global___ResourceProfile = ResourceProfile
diff --git a/python/pyspark/sql/connect/proto/relations_pb2.py 
b/python/pyspark/sql/connect/proto/relations_pb2.py
index 9a38f7efd1e2..ff01d5fe346a 100644
--- a/python/pyspark/sql/connect/proto/relations_pb2.py
+++ b/python/pyspark/sql/connect/proto/relations_pb2.py
@@ -32,10 +32,11 @@ from google.protobuf import any_pb2 as 
google_dot_protobuf_dot_any__pb2
 from pyspark.sql.connect.proto import expressions_pb2 as 
spark_dot_connect_dot_expressions__pb2
 from pyspark.sql.connect.proto import types_pb2 as 
spark_dot_connect_dot_types__pb2
 from pyspark.sql.connect.proto import catalog_pb2 as 
spark_dot_connect_dot_catalog__pb2
+from pyspark.sql.connect.proto import common_pb2 as 
spark_dot_connect_dot_common__pb2
 
 
 DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-    
b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fspark/connect/expressions.proto\x1a\x19spark/connect/types.proto\x1a\x1bspark/connect/catalog.proto"\xa2\x1a\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01
 
\x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02
 
\x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03 
\x01(\x0b\x32\x16.spark.connect.ProjectH\x00R\x07project\x12/\n\x06\x66il [...]
+    
b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fspark/connect/expressions.proto\x1a\x19spark/connect/types.proto\x1a\x1bspark/connect/catalog.proto\x1a\x1aspark/connect/common.proto"\xa2\x1a\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01
 
\x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02
 
\x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03 
\x01(\x0b\x32\x16.spark.connect.Project [...]
 )
 
 _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
@@ -63,152 +64,152 @@ if _descriptor._USE_C_DESCRIPTORS == False:
     
_WITHCOLUMNSRENAMED.fields_by_name["rename_columns_map"]._serialized_options = 
b"\030\001"
     _PARSE_OPTIONSENTRY._options = None
     _PARSE_OPTIONSENTRY._serialized_options = b"8\001"
-    _RELATION._serialized_start = 165
-    _RELATION._serialized_end = 3527
-    _UNKNOWN._serialized_start = 3529
-    _UNKNOWN._serialized_end = 3538
-    _RELATIONCOMMON._serialized_start = 3540
-    _RELATIONCOMMON._serialized_end = 3631
-    _SQL._serialized_start = 3634
-    _SQL._serialized_end = 4112
-    _SQL_ARGSENTRY._serialized_start = 3928
-    _SQL_ARGSENTRY._serialized_end = 4018
-    _SQL_NAMEDARGUMENTSENTRY._serialized_start = 4020
-    _SQL_NAMEDARGUMENTSENTRY._serialized_end = 4112
-    _READ._serialized_start = 4115
-    _READ._serialized_end = 4778
-    _READ_NAMEDTABLE._serialized_start = 4293
-    _READ_NAMEDTABLE._serialized_end = 4485
-    _READ_NAMEDTABLE_OPTIONSENTRY._serialized_start = 4427
-    _READ_NAMEDTABLE_OPTIONSENTRY._serialized_end = 4485
-    _READ_DATASOURCE._serialized_start = 4488
-    _READ_DATASOURCE._serialized_end = 4765
-    _READ_DATASOURCE_OPTIONSENTRY._serialized_start = 4427
-    _READ_DATASOURCE_OPTIONSENTRY._serialized_end = 4485
-    _PROJECT._serialized_start = 4780
-    _PROJECT._serialized_end = 4897
-    _FILTER._serialized_start = 4899
-    _FILTER._serialized_end = 5011
-    _JOIN._serialized_start = 5014
-    _JOIN._serialized_end = 5675
-    _JOIN_JOINDATATYPE._serialized_start = 5353
-    _JOIN_JOINDATATYPE._serialized_end = 5445
-    _JOIN_JOINTYPE._serialized_start = 5448
-    _JOIN_JOINTYPE._serialized_end = 5656
-    _SETOPERATION._serialized_start = 5678
-    _SETOPERATION._serialized_end = 6157
-    _SETOPERATION_SETOPTYPE._serialized_start = 5994
-    _SETOPERATION_SETOPTYPE._serialized_end = 6108
-    _LIMIT._serialized_start = 6159
-    _LIMIT._serialized_end = 6235
-    _OFFSET._serialized_start = 6237
-    _OFFSET._serialized_end = 6316
-    _TAIL._serialized_start = 6318
-    _TAIL._serialized_end = 6393
-    _AGGREGATE._serialized_start = 6396
-    _AGGREGATE._serialized_end = 7162
-    _AGGREGATE_PIVOT._serialized_start = 6811
-    _AGGREGATE_PIVOT._serialized_end = 6922
-    _AGGREGATE_GROUPINGSETS._serialized_start = 6924
-    _AGGREGATE_GROUPINGSETS._serialized_end = 7000
-    _AGGREGATE_GROUPTYPE._serialized_start = 7003
-    _AGGREGATE_GROUPTYPE._serialized_end = 7162
-    _SORT._serialized_start = 7165
-    _SORT._serialized_end = 7325
-    _DROP._serialized_start = 7328
-    _DROP._serialized_end = 7469
-    _DEDUPLICATE._serialized_start = 7472
-    _DEDUPLICATE._serialized_end = 7712
-    _LOCALRELATION._serialized_start = 7714
-    _LOCALRELATION._serialized_end = 7803
-    _CACHEDLOCALRELATION._serialized_start = 7805
-    _CACHEDLOCALRELATION._serialized_end = 7877
-    _CACHEDREMOTERELATION._serialized_start = 7879
-    _CACHEDREMOTERELATION._serialized_end = 7934
-    _SAMPLE._serialized_start = 7937
-    _SAMPLE._serialized_end = 8210
-    _RANGE._serialized_start = 8213
-    _RANGE._serialized_end = 8358
-    _SUBQUERYALIAS._serialized_start = 8360
-    _SUBQUERYALIAS._serialized_end = 8474
-    _REPARTITION._serialized_start = 8477
-    _REPARTITION._serialized_end = 8619
-    _SHOWSTRING._serialized_start = 8622
-    _SHOWSTRING._serialized_end = 8764
-    _HTMLSTRING._serialized_start = 8766
-    _HTMLSTRING._serialized_end = 8880
-    _STATSUMMARY._serialized_start = 8882
-    _STATSUMMARY._serialized_end = 8974
-    _STATDESCRIBE._serialized_start = 8976
-    _STATDESCRIBE._serialized_end = 9057
-    _STATCROSSTAB._serialized_start = 9059
-    _STATCROSSTAB._serialized_end = 9160
-    _STATCOV._serialized_start = 9162
-    _STATCOV._serialized_end = 9258
-    _STATCORR._serialized_start = 9261
-    _STATCORR._serialized_end = 9398
-    _STATAPPROXQUANTILE._serialized_start = 9401
-    _STATAPPROXQUANTILE._serialized_end = 9565
-    _STATFREQITEMS._serialized_start = 9567
-    _STATFREQITEMS._serialized_end = 9692
-    _STATSAMPLEBY._serialized_start = 9695
-    _STATSAMPLEBY._serialized_end = 10004
-    _STATSAMPLEBY_FRACTION._serialized_start = 9896
-    _STATSAMPLEBY_FRACTION._serialized_end = 9995
-    _NAFILL._serialized_start = 10007
-    _NAFILL._serialized_end = 10141
-    _NADROP._serialized_start = 10144
-    _NADROP._serialized_end = 10278
-    _NAREPLACE._serialized_start = 10281
-    _NAREPLACE._serialized_end = 10577
-    _NAREPLACE_REPLACEMENT._serialized_start = 10436
-    _NAREPLACE_REPLACEMENT._serialized_end = 10577
-    _TODF._serialized_start = 10579
-    _TODF._serialized_end = 10667
-    _WITHCOLUMNSRENAMED._serialized_start = 10670
-    _WITHCOLUMNSRENAMED._serialized_end = 11052
-    _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_start = 10914
-    _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_end = 10981
-    _WITHCOLUMNSRENAMED_RENAME._serialized_start = 10983
-    _WITHCOLUMNSRENAMED_RENAME._serialized_end = 11052
-    _WITHCOLUMNS._serialized_start = 11054
-    _WITHCOLUMNS._serialized_end = 11173
-    _WITHWATERMARK._serialized_start = 11176
-    _WITHWATERMARK._serialized_end = 11310
-    _HINT._serialized_start = 11313
-    _HINT._serialized_end = 11445
-    _UNPIVOT._serialized_start = 11448
-    _UNPIVOT._serialized_end = 11775
-    _UNPIVOT_VALUES._serialized_start = 11705
-    _UNPIVOT_VALUES._serialized_end = 11764
-    _TOSCHEMA._serialized_start = 11777
-    _TOSCHEMA._serialized_end = 11883
-    _REPARTITIONBYEXPRESSION._serialized_start = 11886
-    _REPARTITIONBYEXPRESSION._serialized_end = 12089
-    _MAPPARTITIONS._serialized_start = 12092
-    _MAPPARTITIONS._serialized_end = 12273
-    _GROUPMAP._serialized_start = 12276
-    _GROUPMAP._serialized_end = 12911
-    _COGROUPMAP._serialized_start = 12914
-    _COGROUPMAP._serialized_end = 13440
-    _APPLYINPANDASWITHSTATE._serialized_start = 13443
-    _APPLYINPANDASWITHSTATE._serialized_end = 13800
-    _COMMONINLINEUSERDEFINEDTABLEFUNCTION._serialized_start = 13803
-    _COMMONINLINEUSERDEFINEDTABLEFUNCTION._serialized_end = 14047
-    _PYTHONUDTF._serialized_start = 14050
-    _PYTHONUDTF._serialized_end = 14227
-    _COMMONINLINEUSERDEFINEDDATASOURCE._serialized_start = 14230
-    _COMMONINLINEUSERDEFINEDDATASOURCE._serialized_end = 14381
-    _PYTHONDATASOURCE._serialized_start = 14383
-    _PYTHONDATASOURCE._serialized_end = 14458
-    _COLLECTMETRICS._serialized_start = 14461
-    _COLLECTMETRICS._serialized_end = 14597
-    _PARSE._serialized_start = 14600
-    _PARSE._serialized_end = 14988
-    _PARSE_OPTIONSENTRY._serialized_start = 4427
-    _PARSE_OPTIONSENTRY._serialized_end = 4485
-    _PARSE_PARSEFORMAT._serialized_start = 14889
-    _PARSE_PARSEFORMAT._serialized_end = 14977
-    _ASOFJOIN._serialized_start = 14991
-    _ASOFJOIN._serialized_end = 15466
+    _RELATION._serialized_start = 193
+    _RELATION._serialized_end = 3555
+    _UNKNOWN._serialized_start = 3557
+    _UNKNOWN._serialized_end = 3566
+    _RELATIONCOMMON._serialized_start = 3568
+    _RELATIONCOMMON._serialized_end = 3659
+    _SQL._serialized_start = 3662
+    _SQL._serialized_end = 4140
+    _SQL_ARGSENTRY._serialized_start = 3956
+    _SQL_ARGSENTRY._serialized_end = 4046
+    _SQL_NAMEDARGUMENTSENTRY._serialized_start = 4048
+    _SQL_NAMEDARGUMENTSENTRY._serialized_end = 4140
+    _READ._serialized_start = 4143
+    _READ._serialized_end = 4806
+    _READ_NAMEDTABLE._serialized_start = 4321
+    _READ_NAMEDTABLE._serialized_end = 4513
+    _READ_NAMEDTABLE_OPTIONSENTRY._serialized_start = 4455
+    _READ_NAMEDTABLE_OPTIONSENTRY._serialized_end = 4513
+    _READ_DATASOURCE._serialized_start = 4516
+    _READ_DATASOURCE._serialized_end = 4793
+    _READ_DATASOURCE_OPTIONSENTRY._serialized_start = 4455
+    _READ_DATASOURCE_OPTIONSENTRY._serialized_end = 4513
+    _PROJECT._serialized_start = 4808
+    _PROJECT._serialized_end = 4925
+    _FILTER._serialized_start = 4927
+    _FILTER._serialized_end = 5039
+    _JOIN._serialized_start = 5042
+    _JOIN._serialized_end = 5703
+    _JOIN_JOINDATATYPE._serialized_start = 5381
+    _JOIN_JOINDATATYPE._serialized_end = 5473
+    _JOIN_JOINTYPE._serialized_start = 5476
+    _JOIN_JOINTYPE._serialized_end = 5684
+    _SETOPERATION._serialized_start = 5706
+    _SETOPERATION._serialized_end = 6185
+    _SETOPERATION_SETOPTYPE._serialized_start = 6022
+    _SETOPERATION_SETOPTYPE._serialized_end = 6136
+    _LIMIT._serialized_start = 6187
+    _LIMIT._serialized_end = 6263
+    _OFFSET._serialized_start = 6265
+    _OFFSET._serialized_end = 6344
+    _TAIL._serialized_start = 6346
+    _TAIL._serialized_end = 6421
+    _AGGREGATE._serialized_start = 6424
+    _AGGREGATE._serialized_end = 7190
+    _AGGREGATE_PIVOT._serialized_start = 6839
+    _AGGREGATE_PIVOT._serialized_end = 6950
+    _AGGREGATE_GROUPINGSETS._serialized_start = 6952
+    _AGGREGATE_GROUPINGSETS._serialized_end = 7028
+    _AGGREGATE_GROUPTYPE._serialized_start = 7031
+    _AGGREGATE_GROUPTYPE._serialized_end = 7190
+    _SORT._serialized_start = 7193
+    _SORT._serialized_end = 7353
+    _DROP._serialized_start = 7356
+    _DROP._serialized_end = 7497
+    _DEDUPLICATE._serialized_start = 7500
+    _DEDUPLICATE._serialized_end = 7740
+    _LOCALRELATION._serialized_start = 7742
+    _LOCALRELATION._serialized_end = 7831
+    _CACHEDLOCALRELATION._serialized_start = 7833
+    _CACHEDLOCALRELATION._serialized_end = 7905
+    _CACHEDREMOTERELATION._serialized_start = 7907
+    _CACHEDREMOTERELATION._serialized_end = 7962
+    _SAMPLE._serialized_start = 7965
+    _SAMPLE._serialized_end = 8238
+    _RANGE._serialized_start = 8241
+    _RANGE._serialized_end = 8386
+    _SUBQUERYALIAS._serialized_start = 8388
+    _SUBQUERYALIAS._serialized_end = 8502
+    _REPARTITION._serialized_start = 8505
+    _REPARTITION._serialized_end = 8647
+    _SHOWSTRING._serialized_start = 8650
+    _SHOWSTRING._serialized_end = 8792
+    _HTMLSTRING._serialized_start = 8794
+    _HTMLSTRING._serialized_end = 8908
+    _STATSUMMARY._serialized_start = 8910
+    _STATSUMMARY._serialized_end = 9002
+    _STATDESCRIBE._serialized_start = 9004
+    _STATDESCRIBE._serialized_end = 9085
+    _STATCROSSTAB._serialized_start = 9087
+    _STATCROSSTAB._serialized_end = 9188
+    _STATCOV._serialized_start = 9190
+    _STATCOV._serialized_end = 9286
+    _STATCORR._serialized_start = 9289
+    _STATCORR._serialized_end = 9426
+    _STATAPPROXQUANTILE._serialized_start = 9429
+    _STATAPPROXQUANTILE._serialized_end = 9593
+    _STATFREQITEMS._serialized_start = 9595
+    _STATFREQITEMS._serialized_end = 9720
+    _STATSAMPLEBY._serialized_start = 9723
+    _STATSAMPLEBY._serialized_end = 10032
+    _STATSAMPLEBY_FRACTION._serialized_start = 9924
+    _STATSAMPLEBY_FRACTION._serialized_end = 10023
+    _NAFILL._serialized_start = 10035
+    _NAFILL._serialized_end = 10169
+    _NADROP._serialized_start = 10172
+    _NADROP._serialized_end = 10306
+    _NAREPLACE._serialized_start = 10309
+    _NAREPLACE._serialized_end = 10605
+    _NAREPLACE_REPLACEMENT._serialized_start = 10464
+    _NAREPLACE_REPLACEMENT._serialized_end = 10605
+    _TODF._serialized_start = 10607
+    _TODF._serialized_end = 10695
+    _WITHCOLUMNSRENAMED._serialized_start = 10698
+    _WITHCOLUMNSRENAMED._serialized_end = 11080
+    _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_start = 10942
+    _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_end = 11009
+    _WITHCOLUMNSRENAMED_RENAME._serialized_start = 11011
+    _WITHCOLUMNSRENAMED_RENAME._serialized_end = 11080
+    _WITHCOLUMNS._serialized_start = 11082
+    _WITHCOLUMNS._serialized_end = 11201
+    _WITHWATERMARK._serialized_start = 11204
+    _WITHWATERMARK._serialized_end = 11338
+    _HINT._serialized_start = 11341
+    _HINT._serialized_end = 11473
+    _UNPIVOT._serialized_start = 11476
+    _UNPIVOT._serialized_end = 11803
+    _UNPIVOT_VALUES._serialized_start = 11733
+    _UNPIVOT_VALUES._serialized_end = 11792
+    _TOSCHEMA._serialized_start = 11805
+    _TOSCHEMA._serialized_end = 11911
+    _REPARTITIONBYEXPRESSION._serialized_start = 11914
+    _REPARTITIONBYEXPRESSION._serialized_end = 12117
+    _MAPPARTITIONS._serialized_start = 12120
+    _MAPPARTITIONS._serialized_end = 12352
+    _GROUPMAP._serialized_start = 12355
+    _GROUPMAP._serialized_end = 12990
+    _COGROUPMAP._serialized_start = 12993
+    _COGROUPMAP._serialized_end = 13519
+    _APPLYINPANDASWITHSTATE._serialized_start = 13522
+    _APPLYINPANDASWITHSTATE._serialized_end = 13879
+    _COMMONINLINEUSERDEFINEDTABLEFUNCTION._serialized_start = 13882
+    _COMMONINLINEUSERDEFINEDTABLEFUNCTION._serialized_end = 14126
+    _PYTHONUDTF._serialized_start = 14129
+    _PYTHONUDTF._serialized_end = 14306
+    _COMMONINLINEUSERDEFINEDDATASOURCE._serialized_start = 14309
+    _COMMONINLINEUSERDEFINEDDATASOURCE._serialized_end = 14460
+    _PYTHONDATASOURCE._serialized_start = 14462
+    _PYTHONDATASOURCE._serialized_end = 14537
+    _COLLECTMETRICS._serialized_start = 14540
+    _COLLECTMETRICS._serialized_end = 14676
+    _PARSE._serialized_start = 14679
+    _PARSE._serialized_end = 15067
+    _PARSE_OPTIONSENTRY._serialized_start = 4455
+    _PARSE_OPTIONSENTRY._serialized_end = 4513
+    _PARSE_PARSEFORMAT._serialized_start = 14968
+    _PARSE_PARSEFORMAT._serialized_end = 15056
+    _ASOFJOIN._serialized_start = 15070
+    _ASOFJOIN._serialized_end = 15545
 # @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/relations_pb2.pyi 
b/python/pyspark/sql/connect/proto/relations_pb2.pyi
index 5564084af019..db9609eebb85 100644
--- a/python/pyspark/sql/connect/proto/relations_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/relations_pb2.pyi
@@ -3164,6 +3164,7 @@ class MapPartitions(google.protobuf.message.Message):
     INPUT_FIELD_NUMBER: builtins.int
     FUNC_FIELD_NUMBER: builtins.int
     IS_BARRIER_FIELD_NUMBER: builtins.int
+    PROFILE_ID_FIELD_NUMBER: builtins.int
     @property
     def input(self) -> global___Relation:
         """(Required) Input relation for a mapPartitions-equivalent API: 
mapInPandas, mapInArrow."""
@@ -3172,6 +3173,8 @@ class MapPartitions(google.protobuf.message.Message):
         """(Required) Input user-defined function."""
     is_barrier: builtins.bool
     """(Optional) Whether to use barrier mode execution or not."""
+    profile_id: builtins.int
+    """(Optional) ResourceProfile id used for the stage level scheduling."""
     def __init__(
         self,
         *,
@@ -3179,18 +3182,23 @@ class MapPartitions(google.protobuf.message.Message):
         func: 
pyspark.sql.connect.proto.expressions_pb2.CommonInlineUserDefinedFunction
         | None = ...,
         is_barrier: builtins.bool | None = ...,
+        profile_id: builtins.int | None = ...,
     ) -> None: ...
     def HasField(
         self,
         field_name: typing_extensions.Literal[
             "_is_barrier",
             b"_is_barrier",
+            "_profile_id",
+            b"_profile_id",
             "func",
             b"func",
             "input",
             b"input",
             "is_barrier",
             b"is_barrier",
+            "profile_id",
+            b"profile_id",
         ],
     ) -> builtins.bool: ...
     def ClearField(
@@ -3198,17 +3206,26 @@ class MapPartitions(google.protobuf.message.Message):
         field_name: typing_extensions.Literal[
             "_is_barrier",
             b"_is_barrier",
+            "_profile_id",
+            b"_profile_id",
             "func",
             b"func",
             "input",
             b"input",
             "is_barrier",
             b"is_barrier",
+            "profile_id",
+            b"profile_id",
         ],
     ) -> None: ...
+    @typing.overload
     def WhichOneof(
         self, oneof_group: typing_extensions.Literal["_is_barrier", 
b"_is_barrier"]
     ) -> typing_extensions.Literal["is_barrier"] | None: ...
+    @typing.overload
+    def WhichOneof(
+        self, oneof_group: typing_extensions.Literal["_profile_id", 
b"_profile_id"]
+    ) -> typing_extensions.Literal["profile_id"] | None: ...
 
 global___MapPartitions = MapPartitions
 
diff --git a/python/pyspark/sql/connect/resource/__init__.py 
b/python/pyspark/sql/connect/resource/__init__.py
new file mode 100644
index 000000000000..cce3acad34a4
--- /dev/null
+++ b/python/pyspark/sql/connect/resource/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
diff --git a/python/pyspark/sql/connect/resource/profile.py 
b/python/pyspark/sql/connect/resource/profile.py
new file mode 100644
index 000000000000..c97b75476a58
--- /dev/null
+++ b/python/pyspark/sql/connect/resource/profile.py
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from typing import Optional, Dict
+
+from pyspark.resource import ExecutorResourceRequest, TaskResourceRequest
+
+import pyspark.sql.connect.proto as pb2
+
+
+class ResourceProfile:
+    """The internal ResourceProfile is used to create the Spark ResourceProfile
+    on the server side and store the generated profile id."""
+
+    def __init__(
+        self,
+        exec_req: Optional[Dict[str, ExecutorResourceRequest]] = None,
+        task_req: Optional[Dict[str, TaskResourceRequest]] = None,
+    ):
+        from pyspark.sql.connect.session import SparkSession
+
+        session = SparkSession.getActiveSession()
+        if session is None:
+            raise RuntimeError(
+                "SparkSession should be initialized before ResourceProfile 
creation."
+            )
+
+        exec_req = exec_req or {}
+        task_req = task_req or {}
+
+        self._exec_req = {}
+        self._task_req = {}
+
+        for key, value in exec_req.items():
+            self._exec_req[key] = pb2.ExecutorResourceRequest(
+                resource_name=value.resourceName,
+                amount=value.amount,
+                discovery_script=value.discoveryScript,
+                vendor=value.vendor,
+            )
+
+        for k, v in task_req.items():
+            self._task_req[k] = pb2.TaskResourceRequest(
+                resource_name=v.resourceName, amount=v.amount
+            )
+
+        self._remote_profile = pb2.ResourceProfile(
+            executor_resources=self._exec_req, task_resources=self._task_req
+        )
+
+        self._id = session.client._create_profile(self._remote_profile)
+
+    @property
+    def id(self) -> int:
+        return self._id
diff --git a/python/pyspark/sql/tests/connect/test_resources.py 
b/python/pyspark/sql/tests/connect/test_resources.py
new file mode 100644
index 000000000000..b4cc138c4df8
--- /dev/null
+++ b/python/pyspark/sql/tests/connect/test_resources.py
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import unittest
+
+from pyspark.testing.connectutils import ReusedConnectTestCase
+from pyspark.sql.tests.test_resources import ResourceProfileTestsMixin
+
+
+class ResourceProfileTests(ResourceProfileTestsMixin, ReusedConnectTestCase):
+    @classmethod
+    def master(cls):
+        return "local-cluster[1, 4, 1024]"
+
+
+if __name__ == "__main__":
+    from pyspark.sql.tests.connect.test_resources import *  # noqa: F401
+
+    try:
+        import xmlrunner  # type: ignore[import]
+
+        testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", 
verbosity=2)
+    except ImportError:
+        testRunner = None
+    unittest.main(testRunner=testRunner, verbosity=2)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org


Reply via email to