[ https://issues.apache.org/jira/browse/SPARK-39290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
YuNing Liu updated SPARK-39290: ------------------------------- Description: My program runs in spark on Yarn environment and has four nodes in total. My num executors is set to 4, and so is the executor cores. When I use "df.groupBy().applyInPandas" to process my dataframe, the program is always divided into two jobs. The first job contains only one task, and the second job contains three tasks. And the two jobs have the same DAG diagram and perform exactly the same operations, but the data is different. As a result, the execution time of my program is about doubled. It should be that only one job contains four tasks. This problem has bothered me for a long time and I can't find the reason. This code is a simple example I use to test. My dataframe stores the information of the image data saved by HDFS, including three columns: "Id", "path" and "category". "Id" indicates the node on which the image is located, "path" indicates the specific path, "category" indicates the image category. Job1 and job2 in "WebUI.png" correspond to the two divided jobs. "job1". "Job1.png" and "job2. png" show the executor and node information of the two jobs. {code:java} import os import pandas as pd from pyspark.sql import SparkSession from pyhdfs import HdfsClient spark = SparkSession.builder.appName("test"). \ config("spark.sql.shuffle.partitions", "40"). \ config("spark.default.parallelism", "40"). \ config("spark.sql.execution.arrow.pyspark.enabled", "true"). \ getOrCreate() def process(key, paths): a = len(paths.path) for i in range(100000000): a+=1 a = str(a) return pd.DataFrame([key+(a,)]) if __name__ == '__main__': sc = spark.sparkContext client = HdfsClient(hosts="master:9870", user_name="hadoop") dataset_dir = "/Datasets" files_pd = pd.DataFrame() for slave, per_dataset_dir in enumerate(client.listdir(dataset_dir)): child_path = os.path.join(dataset_dir, per_dataset_dir) files = pd.DataFrame([ [slave, os.path.join(str(child_path), str(child_dir_name), str(filename)), index] for index, child_dir_name in enumerate(client.listdir(child_path)) for filename in client.listdir(os.path.join(child_path, child_dir_name))]) files_pd = pd.concat([files_pd, files]) files_pd = files_pd.sample(frac=1).reset_index(drop=True) spark_files = spark.createDataFrame(files_pd, ("id", "path", "category")) result = spark_files.groupby("id").applyInPandas(process, schema="id long, path string") result.show() {code} was: My program runs in spark on Yarn environment and has four nodes in total. My num executors is set to 4, and so is the executor cores. When I use "df.groupBy().applyInPandas" to process my dataframe, the program is always divided into two jobs. The first job contains only one task, and the second job contains three tasks. And the two jobs have the same DAG diagram and perform exactly the same operations, but the data is different. As a result, the execution time of my program is about doubled. It should be that only one job contains four tasks. This problem has bothered me for a long time and I can't find the reason. This code is a simple example I use to test. My dataframe stores the information of the image data saved by HDFS, including three columns: "Id", "path" and "category". "Id" indicates the node on which the image is located, "path" indicates the specific path, "category" indicates the image category. Job1 and job2 in "WebUI.png" correspond to the two divided jobs. {code:java} import os import pandas as pd from pyspark.sql import SparkSession from pyhdfs import HdfsClient spark = SparkSession.builder.appName("test"). \ config("spark.sql.shuffle.partitions", "40"). \ config("spark.default.parallelism", "40"). \ config("spark.sql.execution.arrow.pyspark.enabled", "true"). \ getOrCreate() def process(key, paths): a = len(paths.path) for i in range(100000000): a+=1 a = str(a) return pd.DataFrame([key+(a,)]) if __name__ == '__main__': sc = spark.sparkContext client = HdfsClient(hosts="master:9870", user_name="hadoop") dataset_dir = "/Datasets" files_pd = pd.DataFrame() for slave, per_dataset_dir in enumerate(client.listdir(dataset_dir)): child_path = os.path.join(dataset_dir, per_dataset_dir) files = pd.DataFrame([ [slave, os.path.join(str(child_path), str(child_dir_name), str(filename)), index] for index, child_dir_name in enumerate(client.listdir(child_path)) for filename in client.listdir(os.path.join(child_path, child_dir_name))]) files_pd = pd.concat([files_pd, files]) files_pd = files_pd.sample(frac=1).reset_index(drop=True) spark_files = spark.createDataFrame(files_pd, ("id", "path", "category")) result = spark_files.groupby("id").applyInPandas(process, schema="id long, path string") result.show() {code} > Question of job division in "df.groupBy().applyInPandas" > -------------------------------------------------------- > > Key: SPARK-39290 > URL: https://issues.apache.org/jira/browse/SPARK-39290 > Project: Spark > Issue Type: Question > Components: PySpark > Affects Versions: 3.2.1 > Environment: python 3.8 > pyspark 3.2.1 > pyarrow 7.0.0 > hadoop 3.3.2 > Reporter: YuNing Liu > Priority: Major > Attachments: DAG.png, WebUI.png, job1.png, job2.png > > > My program runs in spark on Yarn environment and has four nodes in total. My > num executors is set to 4, and so is the executor cores. When I use > "df.groupBy().applyInPandas" to process my dataframe, the program is always > divided into two jobs. The first job contains only one task, and the second > job contains three tasks. And the two jobs have the same DAG diagram and > perform exactly the same operations, but the data is different. As a result, > the execution time of my program is about doubled. It should be that only one > job contains four tasks. This problem has bothered me for a long time and I > can't find the reason. This code is a simple example I use to test. My > dataframe stores the information of the image data saved by HDFS, including > three columns: "Id", "path" and "category". "Id" indicates the node on which > the image is located, "path" indicates the specific path, "category" > indicates the image category. Job1 and job2 in "WebUI.png" correspond to the > two divided jobs. "job1". "Job1.png" and "job2. png" show the executor and > node information of the two jobs. > {code:java} > import os > import pandas as pd > from pyspark.sql import SparkSession > from pyhdfs import HdfsClient > spark = SparkSession.builder.appName("test"). \ > config("spark.sql.shuffle.partitions", "40"). \ > config("spark.default.parallelism", "40"). \ > config("spark.sql.execution.arrow.pyspark.enabled", "true"). \ > getOrCreate() > def process(key, paths): > a = len(paths.path) > for i in range(100000000): > a+=1 > a = str(a) > return pd.DataFrame([key+(a,)]) > if __name__ == '__main__': > sc = spark.sparkContext > client = HdfsClient(hosts="master:9870", user_name="hadoop") > dataset_dir = "/Datasets" > files_pd = pd.DataFrame() > for slave, per_dataset_dir in enumerate(client.listdir(dataset_dir)): > child_path = os.path.join(dataset_dir, per_dataset_dir) > files = pd.DataFrame([ > [slave, os.path.join(str(child_path), str(child_dir_name), > str(filename)), index] > for index, child_dir_name in enumerate(client.listdir(child_path)) > for filename in client.listdir(os.path.join(child_path, > child_dir_name))]) > files_pd = pd.concat([files_pd, files]) > files_pd = files_pd.sample(frac=1).reset_index(drop=True) > spark_files = spark.createDataFrame(files_pd, ("id", "path", "category")) > result = spark_files.groupby("id").applyInPandas(process, schema="id > long, path string") > result.show() > {code} -- This message was sent by Atlassian Jira (v8.20.7#820007) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org