[ https://issues.apache.org/jira/browse/SPARK-39290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
YuNing Liu updated SPARK-39290: ------------------------------- Attachment: job2.png > Question of job division in "df.groupBy().applyInPandas" > -------------------------------------------------------- > > Key: SPARK-39290 > URL: https://issues.apache.org/jira/browse/SPARK-39290 > Project: Spark > Issue Type: Question > Components: PySpark > Affects Versions: 3.2.1 > Environment: python 3.8 > pyspark 3.2.1 > pyarrow 7.0.0 > hadoop 3.3.2 > Reporter: YuNing Liu > Priority: Major > Attachments: DAG.png, WebUI.png, job1.png, job2.png > > > My program runs in spark on Yarn environment and has four nodes in total. My > num executors is set to 4, and so is the executor cores. When I use > "df.groupBy().applyInPandas" to process my dataframe, the program is always > divided into two jobs. The first job contains only one task, and the second > job contains three tasks. And the two jobs have the same DAG diagram and > perform exactly the same operations, but the data is different. As a result, > the execution time of my program is about doubled. It should be that only one > job contains four tasks. This problem has bothered me for a long time and I > can't find the reason. This code is a simple example I use to test. My > dataframe stores the information of the image data saved by HDFS, including > three columns: "Id", "path" and "category". "Id" indicates the node on which > the image is located, "path" indicates the specific path, "category" > indicates the image category. Job1 and job2 in "WebUI.png" correspond to the > two divided jobs. > {code:java} > import os > import pandas as pd > from pyspark.sql import SparkSession > from pyhdfs import HdfsClient > spark = SparkSession.builder.appName("test"). \ > config("spark.sql.shuffle.partitions", "40"). \ > config("spark.default.parallelism", "40"). \ > config("spark.sql.execution.arrow.pyspark.enabled", "true"). \ > getOrCreate() > def process(key, paths): > a = len(paths.path) > for i in range(100000000): > a+=1 > a = str(a) > return pd.DataFrame([key+(a,)]) > if __name__ == '__main__': > sc = spark.sparkContext > client = HdfsClient(hosts="master:9870", user_name="hadoop") > dataset_dir = "/Datasets" > files_pd = pd.DataFrame() > for slave, per_dataset_dir in enumerate(client.listdir(dataset_dir)): > child_path = os.path.join(dataset_dir, per_dataset_dir) > files = pd.DataFrame([ > [slave, os.path.join(str(child_path), str(child_dir_name), > str(filename)), index] > for index, child_dir_name in enumerate(client.listdir(child_path)) > for filename in client.listdir(os.path.join(child_path, > child_dir_name))]) > files_pd = pd.concat([files_pd, files]) > files_pd = files_pd.sample(frac=1).reset_index(drop=True) > spark_files = spark.createDataFrame(files_pd, ("id", "path", "category")) > result = spark_files.groupby("id").applyInPandas(process, schema="id > long, path string") > result.show() > {code} -- This message was sent by Atlassian Jira (v8.20.7#820007) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org