[ 
https://issues.apache.org/jira/browse/SPARK-39290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17542308#comment-17542308
 ] 

Hyukjin Kwon commented on SPARK-39290:
--------------------------------------

It creates partitions per the groups. For questions, let's interact with Spark 
mailing list before filing it as an issue.

> Question of job division in "df.groupBy().applyInPandas"
> --------------------------------------------------------
>
>                 Key: SPARK-39290
>                 URL: https://issues.apache.org/jira/browse/SPARK-39290
>             Project: Spark
>          Issue Type: Question
>          Components: PySpark
>    Affects Versions: 3.2.1
>         Environment: python 3.8
> pyspark 3.2.1
> pyarrow 7.0.0
> hadoop 3.3.2
>            Reporter: YuNing Liu
>            Priority: Major
>         Attachments: DAG.png, WebUI.png, job1.png, job2.png
>
>
> My program runs in spark on Yarn environment and has four nodes in total. My 
> num executors is set to 4, and so is the executor cores. When I use 
> "df.groupBy().applyInPandas" to process my dataframe, the program is always 
> divided into two jobs. The first job contains only one task, and the second 
> job contains three tasks. And the two jobs have the same DAG diagram and 
> perform exactly the same operations, but the data is different. As a result, 
> the execution time of my program is about doubled. It should be that only one 
> job contains four tasks. This problem has bothered me for a long time and I 
> can't find the reason. This code is a simple example I use to test. My 
> dataframe stores the information of the image data saved by HDFS, including 
> three columns: "Id", "path" and "category". "Id" indicates the node on which 
> the image is located, "path" indicates the specific path, "category" 
> indicates the image category.  Job1 and job2 in "WebUI.png" correspond to the 
> two divided jobs. "job1". "Job1.png" and "job2. png" show the executor and 
> node information of the two jobs.
> {code:java}
> import os
> import pandas as pd
> from pyspark.sql import SparkSession
> from pyhdfs import HdfsClient
> spark = SparkSession.builder.appName("test"). \
>     config("spark.sql.shuffle.partitions", "40"). \
>     config("spark.default.parallelism", "40"). \
>     config("spark.sql.execution.arrow.pyspark.enabled", "true"). \
>     getOrCreate()
> def process(key, paths):
>     a = len(paths.path)
>     for i in range(100000000):
>         a+=1
>     a = str(a)
>     return pd.DataFrame([key+(a,)])
> if __name__ == '__main__':
>     sc = spark.sparkContext
>     client = HdfsClient(hosts="master:9870", user_name="hadoop")
>     dataset_dir = "/Datasets"
>     files_pd = pd.DataFrame()
>     for slave, per_dataset_dir in enumerate(client.listdir(dataset_dir)):
>         child_path = os.path.join(dataset_dir, per_dataset_dir)
>         files = pd.DataFrame([
>             [slave, os.path.join(str(child_path), str(child_dir_name), 
> str(filename)), index]
>             for index, child_dir_name in enumerate(client.listdir(child_path))
>             for filename in client.listdir(os.path.join(child_path, 
> child_dir_name))])
>         files_pd = pd.concat([files_pd, files])
>     files_pd = files_pd.sample(frac=1).reset_index(drop=True)
>     spark_files = spark.createDataFrame(files_pd, ("id", "path", "category"))
>     result = spark_files.groupby("id").applyInPandas(process, schema="id 
> long, path string")
>     result.show()
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to