[ https://issues.apache.org/jira/browse/SPARK-40441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon updated SPARK-40441: --------------------------------- Component/s: (was: Pandas API on Spark) > With PANDAS_UDF, data from tasks on the same physical node is aggregated into > one task execution, resulting in concurrency not being fully utilized > --------------------------------------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-40441 > URL: https://issues.apache.org/jira/browse/SPARK-40441 > Project: Spark > Issue Type: Bug > Components: PySpark, Scheduler > Affects Versions: 2.4.4 > Reporter: SimonAries > Priority: Major > Attachments: image-2022-09-15-14-28-04-332.png, > image-2022-09-15-14-29-35-004.png > > > {code:java} > //代码占位符 > import json > import pandas as pd > import pyspark.sql.functions as F > import pyspark.sql.types as T > import torch > from pyspark import SparkConf > from pyspark.sql import SparkSession > from pyspark.sql.functions import pandas_udf, PandasUDFType > import argparse > torch.set_num_threads(1) > schema = T.StructType([T.StructField("topic_id", T.StringType(), True), > T.StructField("topic_ht", T.StringType(), True) > ]) > def parse_args(): > parser = argparse.ArgumentParser() > parser.add_argument('--input_path', help='输入路径', > default="./*", type=str) > parser.add_argument('--output_path', help='输出路径', > default="./tmp_output", type=str) > parser.add_argument('--project_name', help='项目名', > default="tmp", type=str) > parser.add_argument('--calc_date', help='数据分区', > default="2022-06-21", type=str) > parser.add_argument('--edition_codes', help='code', > default="01,19", type=str) > parser.add_argument('--subject_code', help='code', > default="02", type=str) > parser.add_argument('--phase_code', help='code', > default="03", type=str) > return parser.parse_args() > @pandas_udf(schema, PandasUDFType.GROUPED_MAP) > def generate_topic_ht(df): > from pycc.topic_ht_extractor import model_inference_engine > torch.set_num_threads(1) > engine = model_inference_engine("./pycc/model/", "./pycc/resource/") > df_res = pd.DataFrame(columns=["topic_id", "topic_ht"]) > for i in range(len(df)): > topic_json_str = df.iloc[i:i + 1]["question_info"].values[0] > topic = json.loads(topic_json_str.strip()) > topic_ht = engine.predict(topic) > df_res = df_res.append({"topic_id": topic["id"], "topic_ht": > str(topic_ht)}, ignore_index=True) > return df_res > if "__main__" == __name__: > conf = SparkConf() \ > .setAppName("generate_topic_ht") \ > .set("spark.sql.execution.arrow.enabled", "true") > args = parse_args() > spark = > SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate() > spark.sql("select * from {}.dwd_rencently_month_topic_theme_incr where > part = '{}' " > "and > business_type='xxj_map_topic_ht'".format(args.project_name, args.calc_date)) \ > .repartition(20).groupby(F.spark_partition_id()) \ > .apply(generate_topic_ht) \ > .write.mode("overwrite") \ > > .parquet("/project/{}/{}/db/dws/dws_topic_ht_incr/part={}/business_type=xxj_map_topic_ht" > .format(args.project_name, args.project_name, > args.calc_date)) > spark.sql("alter table {}.dws_topic_ht_incr drop if exists partition > (part='{}',business_type='xxj_map_topic_ht')".format(args.project_name, > args.calc_date)) > spark.sql("alter table {}.dws_topic_ht_incr add partition > (part='{}',business_type='xxj_map_topic_ht')".format(args.project_name, > args.calc_date)) {code} > > This caused the data skew to be very serious, and I did repartition operation > before executing the data > !image-2022-09-15-14-29-35-004.png! > !image-2022-09-15-14-28-04-332.png! -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org