Niranda Perera created SPARK-40233: -------------------------------------- Summary: Unable to load large pandas dataframe to pyspark Key: SPARK-40233 URL: https://issues.apache.org/jira/browse/SPARK-40233 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 3.3.0 Reporter: Niranda Perera
I've been trying to join two large pandas dataframes using pyspark using the following code. I'm trying to vary executor cores allocated for the application and measure scalability of pyspark (strong scaling). {code:java} r = 1000000000 # 1Bn rows it = 10 w = 256 unique = 0.9 TOTAL_MEM = 240 TOTAL_NODES = 14 max_val = r * unique rng = default_rng() frame_data = rng.integers(0, max_val, size=(r, 2)) frame_data1 = rng.integers(0, max_val, size=(r, 2)) print(f"data generated", flush=True) df_l = pd.DataFrame(frame_data).add_prefix("col") df_r = pd.DataFrame(frame_data1).add_prefix("col") print(f"data loaded", flush=True) procs = int(math.ceil(w / TOTAL_NODES)) mem = int(TOTAL_MEM*0.9) print(f"world sz {w} procs per worker {procs} mem {mem} iter {it}", flush=True) spark = SparkSession\ .builder\ .appName(f'join {r} {w}')\ .master('spark://node:7077')\ .config('spark.executor.memory', f'{int(mem*0.6)}g')\ .config('spark.executor.pyspark.memory', f'{int(mem*0.4)}g')\ .config('spark.cores.max', w)\ .config('spark.driver.memory', '100g')\ .config('sspark.sql.execution.arrow.pyspark.enabled', 'true')\ .getOrCreate() sdf0 = spark.createDataFrame(df_l).repartition(w).cache() sdf1 = spark.createDataFrame(df_r).repartition(w).cache() print(f"data loaded to spark", flush=True) try: for i in range(it): t1 = time.time() out = sdf0.join(sdf1, on='col0', how='inner') count = out.count() t2 = time.time() print(f"timings {r} 1 {i} {(t2 - t1) * 1000:.0f} ms, {count}", flush=True) del out del count gc.collect() finally: spark.stop() {code} {*}Cluster{*}: I am using standalone spark cluster in a 15 node cluster with 48 cores and 240GB RAM each. I've spawned master and the driver code in node1, while other 14 nodes have spawned workers allocating maximum memory. In the spark context, I am reserving 90% of total memory to executor, splitting 60% to jvm and 40% to pyspark. {*}Issue{*}: When I run the above program, I can see that the executors are being assigned to the app. But it doesn't move forward, even after 60 mins. For smaller row count (10M), this was working without a problem. Driver output {code:java} world sz 256 procs per worker 19 mem 216 iter 8 Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 22/08/26 14:52:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable /N/u/d/dnperera/.conda/envs/env/lib/python3.8/site-packages/pyspark/sql/pandas/conversion.py:425: UserWarning: createDataFrame attempted Arrow optimization because 'spark.sql.execution.arrow.pyspark.enabled' is set to true; however, failed by the reason below: Negative initial size: -589934400 Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true. warn(msg) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org