[ 
https://issues.apache.org/jira/browse/SPARK-39290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

YuNing Liu updated SPARK-39290:
-------------------------------
    Description: 
My program runs in spark on Yarn environment and has four nodes in total. My 
num executors is set to 4, and so is the executor cores. When I use 
"df.groupBy().applyInPandas" to process my dataframe, the program is always 
divided into two jobs. The first job contains only one task, and the second job 
contains three tasks. And the two jobs have the same DAG diagram and perform 
exactly the same operations, but the data is different. As a result, the 
execution time of my program is about doubled. It should be that only one job 
contains four tasks. This problem has bothered me for a long time and I can't 
find the reason. This code is a simple example I use to test. My dataframe 
stores the information of the image data saved by HDFS, including three 
columns: "Id", "path" and "category". "Id" indicates the node on which the 
image is located, "path" indicates the specific path, "category" indicates the 
image category.  Job1 and job2 in the figure correspond to the two divided jobs.
{code:java}
import os
import pandas as pd
from pyspark.sql import SparkSession
from pyhdfs import HdfsClient

spark = SparkSession.builder.appName("test"). \
    config("spark.sql.shuffle.partitions", "40"). \
    config("spark.default.parallelism", "40"). \
    config("spark.sql.execution.arrow.pyspark.enabled", "true"). \
    getOrCreate()


def process(key, paths):
    a = len(paths.path)
    for i in range(100000000):
        a+=1
    a = str(a)
    return pd.DataFrame([key+(a,)])


if __name__ == '__main__':
    sc = spark.sparkContext
    client = HdfsClient(hosts="master:9870", user_name="hadoop")
    dataset_dir = "/Datasets"
    files_pd = pd.DataFrame()
    for slave, per_dataset_dir in enumerate(client.listdir(dataset_dir)):
        child_path = os.path.join(dataset_dir, per_dataset_dir)
        files = pd.DataFrame([
            [slave, os.path.join(str(child_path), str(child_dir_name), 
str(filename)), index]
            for index, child_dir_name in enumerate(client.listdir(child_path))
            for filename in client.listdir(os.path.join(child_path, 
child_dir_name))])
        files_pd = pd.concat([files_pd, files])
    files_pd = files_pd.sample(frac=1).reset_index(drop=True)
    spark_files = spark.createDataFrame(files_pd, ("id", "path", "category"))
    result = spark_files.groupby("id").applyInPandas(process, schema="id long, 
path string")
    result.show()

 {code}

  was:
My program runs in spark on Yarn environment and has four nodes in total. My 
num executors is set to 4, and so is the executor cores. When I use 
"df.groupBy().applyInPandas" to process my dataframe, the program is always 
divided into two jobs. The first job contains only one task, and the second job 
contains three tasks. And the two jobs have the same DAG diagram and perform 
exactly the same operations, but the data is different. As a result, the 
execution time of my program is about doubled. It should be that only one job 
contains four tasks. This problem has bothered me for a long time and I can't 
find the reason. This code is a simple example I use to test. My dataframe 
stores the information of the image data saved by HDFS, including three 
columns: "Id", "path" and "category". "Id" indicates the node on which the 
image is located, "path" indicates the specific path, "category" indicates the 
image category.  Job1 and job2 in the figure correspond to the two divided jobs
{code:java}
import os
import pandas as pd
from pyspark.sql import SparkSession
from pyhdfs import HdfsClient

spark = SparkSession.builder.appName("test"). \
    config("spark.sql.shuffle.partitions", "40"). \
    config("spark.default.parallelism", "40"). \
    config("spark.sql.execution.arrow.pyspark.enabled", "true"). \
    getOrCreate()


def process(key, paths):
    a = len(paths.path)
    for i in range(100000000):
        a+=1
    a = str(a)
    return pd.DataFrame([key+(a,)])


if __name__ == '__main__':
    sc = spark.sparkContext
    client = HdfsClient(hosts="master:9870", user_name="hadoop")
    dataset_dir = "/Datasets"
    files_pd = pd.DataFrame()
    for slave, per_dataset_dir in enumerate(client.listdir(dataset_dir)):
        child_path = os.path.join(dataset_dir, per_dataset_dir)
        files = pd.DataFrame([
            [slave, os.path.join(str(child_path), str(child_dir_name), 
str(filename)), index]
            for index, child_dir_name in enumerate(client.listdir(child_path))
            for filename in client.listdir(os.path.join(child_path, 
child_dir_name))])
        files_pd = pd.concat([files_pd, files])
    files_pd = files_pd.sample(frac=1).reset_index(drop=True)
    spark_files = spark.createDataFrame(files_pd, ("id", "path", "category"))
    result = spark_files.groupby("id").applyInPandas(process, schema="id long, 
path string")
    result.show()

 {code}


> Question of job division in "df.groupBy().applyInPandas"
> --------------------------------------------------------
>
>                 Key: SPARK-39290
>                 URL: https://issues.apache.org/jira/browse/SPARK-39290
>             Project: Spark
>          Issue Type: Question
>          Components: PySpark
>    Affects Versions: 3.2.1
>         Environment: python 3.8
> pyspark 3.2.1
> pyarrow 7.0.0
> hadoop 3.3.2
>            Reporter: YuNing Liu
>            Priority: Major
>         Attachments: WebUI.png
>
>
> My program runs in spark on Yarn environment and has four nodes in total. My 
> num executors is set to 4, and so is the executor cores. When I use 
> "df.groupBy().applyInPandas" to process my dataframe, the program is always 
> divided into two jobs. The first job contains only one task, and the second 
> job contains three tasks. And the two jobs have the same DAG diagram and 
> perform exactly the same operations, but the data is different. As a result, 
> the execution time of my program is about doubled. It should be that only one 
> job contains four tasks. This problem has bothered me for a long time and I 
> can't find the reason. This code is a simple example I use to test. My 
> dataframe stores the information of the image data saved by HDFS, including 
> three columns: "Id", "path" and "category". "Id" indicates the node on which 
> the image is located, "path" indicates the specific path, "category" 
> indicates the image category.  Job1 and job2 in the figure correspond to the 
> two divided jobs.
> {code:java}
> import os
> import pandas as pd
> from pyspark.sql import SparkSession
> from pyhdfs import HdfsClient
> spark = SparkSession.builder.appName("test"). \
>     config("spark.sql.shuffle.partitions", "40"). \
>     config("spark.default.parallelism", "40"). \
>     config("spark.sql.execution.arrow.pyspark.enabled", "true"). \
>     getOrCreate()
> def process(key, paths):
>     a = len(paths.path)
>     for i in range(100000000):
>         a+=1
>     a = str(a)
>     return pd.DataFrame([key+(a,)])
> if __name__ == '__main__':
>     sc = spark.sparkContext
>     client = HdfsClient(hosts="master:9870", user_name="hadoop")
>     dataset_dir = "/Datasets"
>     files_pd = pd.DataFrame()
>     for slave, per_dataset_dir in enumerate(client.listdir(dataset_dir)):
>         child_path = os.path.join(dataset_dir, per_dataset_dir)
>         files = pd.DataFrame([
>             [slave, os.path.join(str(child_path), str(child_dir_name), 
> str(filename)), index]
>             for index, child_dir_name in enumerate(client.listdir(child_path))
>             for filename in client.listdir(os.path.join(child_path, 
> child_dir_name))])
>         files_pd = pd.concat([files_pd, files])
>     files_pd = files_pd.sample(frac=1).reset_index(drop=True)
>     spark_files = spark.createDataFrame(files_pd, ("id", "path", "category"))
>     result = spark_files.groupby("id").applyInPandas(process, schema="id 
> long, path string")
>     result.show()
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to