wbo4958 commented on PR #45232:
URL: https://github.com/apache/spark/pull/45232#issuecomment-1963261901

   # Manual tests
   
   
   The manual tests were conducted on a spark Standalone cluster with only 1 
worker which has 6 cpu cores.
   
   ## With dynamic allocation disabled.
   
   ``` bash
   start-connect-server.sh --master spark://192.168.0.106:7077 \
      --jars jars/spark-connect_2.13-4.0.0-SNAPSHOT.jar  \
         --conf spark.executor.cores=4 \
         --conf spark.task.cpus=1 \
         --conf spark.dynamicAllocation.enabled=false
   ```
   
   The above command starts the connect server and it requires 1 executor with 
4 CPU cores, and the default `task.cpus = 1`, so the default tasks parallelism 
is 4 at a time.
   
   And then launch the spark connect pyspark client by
   
   ``` bash
   pyspark --remote "sc://localhost"
   ```
   
   1. `task.cores=1`
   
   Test code:
   
   ``` python
   from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, 
ResourceProfileBuilder
   
   def filter_func(iterator):
       for pdf in iterator:
           yield pdf
   
   df = spark.range(0, 100, 1, 6)
   from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, 
ResourceProfileBuilder
   treqs = TaskResourceRequests().cpus(1)
   rp = ResourceProfileBuilder().require(treqs).build
   df.repartition(3).mapInArrow(lambda iter: iter, df.schema, False, 
rp).collect()
   ```
   
   When the required `task.cpus=1`, `executor.cores=4` (No executor resource 
specified, use the default one), there will be 4 tasks running for rp at the 
same time.
   
   The entire Spark application consists of a single Spark job that will be 
divided into two stages. The first shuffle stage comprises 6 tasks, the first 4 
tasks will be executed simultaneously, then the last 2 tasks.
   
   
![1](https://github.com/apache/spark/assets/1320706/720fef7b-3a72-456f-9c60-01b86011ec84)
   
   The second ResultStage comprises 3 tasks, all of which will be executed 
simultaneously since the required `task.cpus` is  1.
   
   
![2](https://github.com/apache/spark/assets/1320706/0804d2af-e25d-4f54-906e-cc367a6aa2eb)
   
   2. `task.cores=2`
   
   Test code,
   
   ``` python
   from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, 
ResourceProfileBuilder
   
   def filter_func(iterator):
       for pdf in iterator:
           yield pdf
   
   df = spark.range(0, 100, 1, 6)
   from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, 
ResourceProfileBuilder
   treqs = TaskResourceRequests().cpus(2)
   rp = ResourceProfileBuilder().require(treqs).build
   df.repartition(3).mapInArrow(lambda iter: iter, df.schema, False, 
rp).collect()
   ```
   
   When the required `task.cpus=2`, `executor.cores=4` (No executor resource 
specified, use the default one), there will be 2 tasks running for rp.
   
   The first shuffle stage behaves the same as the first one. 
   
   The second ResultStage comprises 3 tasks, so the first 2 tasks will be 
running at a time, and then execute the last task.
   
   
![3](https://github.com/apache/spark/assets/1320706/870fedb2-52f0-4a54-9b23-f37b9d2a2228)
   
   3. `task.cores=3`
   
   Test code,
   
   ``` python
   from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, 
ResourceProfileBuilder
   
   def filter_func(iterator):
       for pdf in iterator:
           yield pdf
   
   df = spark.range(0, 100, 1, 6)
   from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, 
ResourceProfileBuilder
   treqs = TaskResourceRequests().cpus(3)
   rp = ResourceProfileBuilder().require(treqs).build
   df.repartition(3).mapInArrow(lambda iter: iter, df.schema, False, 
rp).collect()
   ```
   
   When the required `task.cpus=3`, `executor.cores=4` (No executor resource 
specified, use the default one), there will be 1 task running for rp.
   
   The first shuffle stage behaves the same as the first one. 
   
   The second ResultStage comprises 3 tasks, all of which will be running 
serially.
   
   
![4](https://github.com/apache/spark/assets/1320706/a6b730ab-99d5-4563-a853-0682fcd3a10d)
   
   
   4. `task.cores=5`
   
   ``` python
   from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, 
ResourceProfileBuilder
   
   def filter_func(iterator):
       for pdf in iterator:
           yield pdf
   
   df = spark.range(0, 100, 1, 6)
   from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, 
ResourceProfileBuilder
   treqs = TaskResourceRequests().cpus(5)
   rp = ResourceProfileBuilder().require(treqs).build
   df.repartition(3).mapInArrow(lambda iter: iter, df.schema, False, 
rp).collect()
   ```
   
   exception happened.
   ``` console
   Traceback (most recent call last):
     File "<stdin>", line 1, in <module>
     File 
"/home/bobwang/github/mytools/spark.home/spark-4.0.0-SNAPSHOT-bin-wbo4958-spark/python/pyspark/sql/connect/dataframe.py",
 line 1763, in collect
       table, schema = self._to_table()
     File 
"/home/bobwang/github/mytools/spark.home/spark-4.0.0-SNAPSHOT-bin-wbo4958-spark/python/pyspark/sql/connect/dataframe.py",
 line 1774, in _to_table
       query = self._plan.to_proto(self._session.client)
     File 
"/home/bobwang/github/mytools/spark.home/spark-4.0.0-SNAPSHOT-bin-wbo4958-spark/python/pyspark/sql/connect/plan.py",
 line 127, in to_proto
       plan.root.CopyFrom(self.plan(session))
     File 
"/home/xxx/github/mytools/spark.home/spark-4.0.0-SNAPSHOT-bin-wbo4958-spark/python/pyspark/sql/connect/plan.py",
 line 2201, in plan
       plan.map_partitions.profile_id = self._profile.id
     File 
"/home/bobwang/github/mytools/spark.home/spark-4.0.0-SNAPSHOT-bin-wbo4958-spark/python/pyspark/resource/profile.py",
 line 132, in id
       rp = _ResourceProfile(
     File 
"/home/bobwang/github/mytools/spark.home/spark-4.0.0-SNAPSHOT-bin-wbo4958-spark/python/pyspark/sql/connect/resource/profile.py",
 line 65, in __init__
       self._id = session.client.build_resource_profile(self._remote_profile)
     File 
"/home/bobwang/github/mytools/spark.home/spark-4.0.0-SNAPSHOT-bin-wbo4958-spark/python/pyspark/sql/connect/client/core.py",
 line 1741, in build_resource_profile
       resp = self._stub.BuildResourceProfile(req)
     File 
"/home/bobwang/anaconda3/envs/pyspark/lib/python3.10/site-packages/grpc/_channel.py",
 line 1160, in __call__
       return _end_unary_response_blocking(state, call, False, None)
     File 
"/home/bobwang/anaconda3/envs/pyspark/lib/python3.10/site-packages/grpc/_channel.py",
 line 1003, in _end_unary_response_blocking
       raise _InactiveRpcError(state)  # pytype: disable=not-instantiable
   grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated 
with:
        status = StatusCode.INTERNAL
        details = "The number of cores per executor (=4) has to be >= the 
number of cpus per task = 5."
        debug_error_string = "UNKNOWN:Error received from peer  
{grpc_message:"The number of cores per executor (=4) has to be >= the number of 
cpus per task = 5.", grpc_status:13, 
created_time:"2024-02-26T10:42:37.331616664+08:00"}"
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to