[ 
https://issues.apache.org/jira/browse/SPARK-40441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-40441:
---------------------------------
    Component/s:     (was: Pandas API on Spark)

> With PANDAS_UDF, data from tasks on the same physical node is aggregated into 
> one task execution, resulting in concurrency not being fully utilized
> ---------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-40441
>                 URL: https://issues.apache.org/jira/browse/SPARK-40441
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, Scheduler
>    Affects Versions: 2.4.4
>            Reporter: SimonAries
>            Priority: Major
>         Attachments: image-2022-09-15-14-28-04-332.png, 
> image-2022-09-15-14-29-35-004.png
>
>
> {code:java}
> //代码占位符
> import json
> import pandas as pd
> import pyspark.sql.functions as F
> import pyspark.sql.types as T
> import torch
> from pyspark import SparkConf
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import pandas_udf, PandasUDFType
> import argparse
> torch.set_num_threads(1)
> schema = T.StructType([T.StructField("topic_id", T.StringType(), True),
>                        T.StructField("topic_ht", T.StringType(), True)
>                        ])
> def parse_args():
>     parser = argparse.ArgumentParser()
>     parser.add_argument('--input_path', help='输入路径',
>                         default="./*", type=str)
>     parser.add_argument('--output_path', help='输出路径',
>                         default="./tmp_output", type=str)
>     parser.add_argument('--project_name', help='项目名',
>                         default="tmp", type=str)
>     parser.add_argument('--calc_date', help='数据分区',
>                         default="2022-06-21", type=str)
>     parser.add_argument('--edition_codes', help='code',
>                         default="01,19", type=str)
>     parser.add_argument('--subject_code', help='code',
>                         default="02", type=str)
>     parser.add_argument('--phase_code', help='code',
>                         default="03", type=str)
>     return parser.parse_args()
> @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
> def generate_topic_ht(df):
>     from pycc.topic_ht_extractor import model_inference_engine
>     torch.set_num_threads(1)
>     engine = model_inference_engine("./pycc/model/", "./pycc/resource/")
>     df_res = pd.DataFrame(columns=["topic_id", "topic_ht"])
>     for i in range(len(df)):
>         topic_json_str = df.iloc[i:i + 1]["question_info"].values[0]
>         topic = json.loads(topic_json_str.strip())
>         topic_ht = engine.predict(topic)
>         df_res = df_res.append({"topic_id": topic["id"], "topic_ht": 
> str(topic_ht)}, ignore_index=True)
>     return df_res
> if "__main__" == __name__:
>     conf = SparkConf() \
>         .setAppName("generate_topic_ht") \
>         .set("spark.sql.execution.arrow.enabled", "true")
>     args = parse_args()
>     spark = 
> SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()
>     spark.sql("select * from {}.dwd_rencently_month_topic_theme_incr where 
> part = '{}' "
>               "and 
> business_type='xxj_map_topic_ht'".format(args.project_name, args.calc_date)) \
>         .repartition(20).groupby(F.spark_partition_id()) \
>         .apply(generate_topic_ht) \
>         .write.mode("overwrite") \
>         
> .parquet("/project/{}/{}/db/dws/dws_topic_ht_incr/part={}/business_type=xxj_map_topic_ht"
>                  .format(args.project_name, args.project_name, 
> args.calc_date))
>     spark.sql("alter table {}.dws_topic_ht_incr drop if exists partition 
> (part='{}',business_type='xxj_map_topic_ht')".format(args.project_name, 
> args.calc_date))
>     spark.sql("alter table {}.dws_topic_ht_incr add partition 
> (part='{}',business_type='xxj_map_topic_ht')".format(args.project_name, 
> args.calc_date)) {code}
>  
> This caused the data skew to be very serious, and I did repartition operation 
> before executing the data
> !image-2022-09-15-14-29-35-004.png!
> !image-2022-09-15-14-28-04-332.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to