[ https://issues.apache.org/jira/browse/SPARK-47650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
yinan zhan updated SPARK-47650: ------------------------------- Description: The data in Partition 1 was executed twice. import json import os import time from pyspark.sql import SparkSession num_gpus = 8 spark = SparkSession.builder \ .appName("llm hard negs records") \ .config("spark.executor.memory", "4g") \ .config("spark.driver.memory", "40g") \ .config("spark.local.dir", "/tmp/pyspark") \ .master(f"local[\{num_gpus}]") \ .getOrCreate() def process_partition(index, partition): device_id = index % num_gpus device = f"cuda:\{device_id}" print(device) time.sleep(10) results = [] s = 0 for row in partition: results.append((row['Query'], row['Hard Negative Document'], row['Positive Document'], "C")) s += 1 print(str(index) + "cool" + str(s)) return results def generate_fake_data(num_records, output_file_path): fake_data = [{ "Query": f"Query \{i}", "Hard Negative Document": f"Hard Negative Document \{i}", "Positive Document": f"Positive Document \{i}" } for i in range(num_records)] os.makedirs(os.path.dirname(output_file_path), exist_ok=True) with open(output_file_path, 'w') as f: for item in fake_data: f.write(json.dumps(item) + '\n') num_records = 2000 file_path = '/tmp/fake_input_data.jsonl' generate_fake_data(num_records, file_path) df = spark.read.json(file_path).repartition(num_gpus) results_rdd = df.rdd.mapPartitionsWithIndex(process_partition) results_df = results_rdd.toDF(["Query", "Hard Negative Document", "Positive Document", "Result"]) output_path = "/tmp/bc_inputs6" results_df.write.json(output_path, mode="overwrite") was: import json import os import time from pyspark.sql import SparkSession num_gpus = 8 spark = SparkSession.builder \ .appName("llm hard negs records") \ .config("spark.executor.memory", "4g") \ .config("spark.driver.memory", "40g") \ .config("spark.local.dir", "/tmp/pyspark") \ .master(f"local[\{num_gpus}]") \ .getOrCreate() def process_partition(index, partition): device_id = index % num_gpus device = f"cuda:\{device_id}" print(device) time.sleep(10) results = [] s = 0 for row in partition: results.append((row['Query'], row['Hard Negative Document'], row['Positive Document'], "C")) s += 1 print(str(index) + "cool" + str(s)) return results def generate_fake_data(num_records, output_file_path): fake_data = [{ "Query": f"Query \{i}", "Hard Negative Document": f"Hard Negative Document \{i}", "Positive Document": f"Positive Document \{i}" } for i in range(num_records)] os.makedirs(os.path.dirname(output_file_path), exist_ok=True) with open(output_file_path, 'w') as f: for item in fake_data: f.write(json.dumps(item) + '\n') num_records = 2000 file_path = '/tmp/fake_input_data.jsonl' generate_fake_data(num_records, file_path) df = spark.read.json(file_path).repartition(num_gpus) results_rdd = df.rdd.mapPartitionsWithIndex(process_partition) results_df = results_rdd.toDF(["Query", "Hard Negative Document", "Positive Document", "Result"]) output_path = "/tmp/bc_inputs6" results_df.write.json(output_path, mode="overwrite") > In local mode, Spark DataFrame cannot fully parallelize > ------------------------------------------------------- > > Key: SPARK-47650 > URL: https://issues.apache.org/jira/browse/SPARK-47650 > Project: Spark > Issue Type: Improvement > Components: PySpark, SQL > Affects Versions: 3.5.1 > Reporter: yinan zhan > Priority: Minor > > The data in Partition 1 was executed twice. > > import json > import os > import time > from pyspark.sql import SparkSession > num_gpus = 8 > spark = SparkSession.builder \ > .appName("llm hard negs records") \ > .config("spark.executor.memory", "4g") \ > .config("spark.driver.memory", "40g") \ > .config("spark.local.dir", "/tmp/pyspark") \ > .master(f"local[\{num_gpus}]") \ > .getOrCreate() > def process_partition(index, partition): > device_id = index % num_gpus > device = f"cuda:\{device_id}" > print(device) > time.sleep(10) > results = [] > s = 0 > for row in partition: > results.append((row['Query'], row['Hard Negative Document'], > row['Positive Document'], "C")) > s += 1 > print(str(index) + "cool" + str(s)) > return results > def generate_fake_data(num_records, output_file_path): > fake_data = [{ > "Query": f"Query \{i}", > "Hard Negative Document": f"Hard Negative Document \{i}", > "Positive Document": f"Positive Document \{i}" > } for i in range(num_records)] > os.makedirs(os.path.dirname(output_file_path), exist_ok=True) > with open(output_file_path, 'w') as f: > for item in fake_data: > f.write(json.dumps(item) + '\n') > num_records = 2000 > file_path = '/tmp/fake_input_data.jsonl' > generate_fake_data(num_records, file_path) > df = spark.read.json(file_path).repartition(num_gpus) > results_rdd = df.rdd.mapPartitionsWithIndex(process_partition) > results_df = results_rdd.toDF(["Query", "Hard Negative Document", "Positive > Document", "Result"]) > output_path = "/tmp/bc_inputs6" > results_df.write.json(output_path, mode="overwrite") -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org