[ https://issues.apache.org/jira/browse/SPARK-40233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17598566#comment-17598566 ]
Niranda Perera commented on SPARK-40233: ---------------------------------------- Well, the driver actually hangs after throwing that warning. Until you mentioned it here, I wasn't aware that it was a python serialization problem. My initial thought was, with arrow enabled, spark should not serialize anything, but rather copies the arrow buffers from the driver to the executors. > Unable to load large pandas dataframe to pyspark > ------------------------------------------------ > > Key: SPARK-40233 > URL: https://issues.apache.org/jira/browse/SPARK-40233 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 3.3.0 > Reporter: Niranda Perera > Priority: Major > > I've been trying to join two large pandas dataframes using pyspark using the > following code. I'm trying to vary executor cores allocated for the > application and measure scalability of pyspark (strong scaling). > {code:java} > r = 1000000000 # 1Bn rows > it = 10 > w = 256 > unique = 0.9 > TOTAL_MEM = 240 > TOTAL_NODES = 14 > max_val = r * unique > rng = default_rng() > frame_data = rng.integers(0, max_val, size=(r, 2)) > frame_data1 = rng.integers(0, max_val, size=(r, 2)) > print(f"data generated", flush=True) > df_l = pd.DataFrame(frame_data).add_prefix("col") > df_r = pd.DataFrame(frame_data1).add_prefix("col") > print(f"data loaded", flush=True) > procs = int(math.ceil(w / TOTAL_NODES)) > mem = int(TOTAL_MEM*0.9) > print(f"world sz {w} procs per worker {procs} mem {mem} iter {it}", > flush=True) > spark = SparkSession\ > .builder\ > .appName(f'join {r} {w}')\ > .master('spark://node:7077')\ > .config('spark.executor.memory', f'{int(mem*0.6)}g')\ > .config('spark.executor.pyspark.memory', f'{int(mem*0.4)}g')\ > .config('spark.cores.max', w)\ > .config('spark.driver.memory', '100g')\ > .config('sspark.sql.execution.arrow.pyspark.enabled', 'true')\ > .getOrCreate() > sdf0 = spark.createDataFrame(df_l).repartition(w).cache() > sdf1 = spark.createDataFrame(df_r).repartition(w).cache() > print(f"data loaded to spark", flush=True) > try: > for i in range(it): > t1 = time.time() > out = sdf0.join(sdf1, on='col0', how='inner') > count = out.count() > t2 = time.time() > print(f"timings {r} 1 {i} {(t2 - t1) * 1000:.0f} ms, {count}", > flush=True) > > del out > del count > gc.collect() > finally: > spark.stop() {code} > {*}Cluster{*}: I am using standalone spark cluster in a 15 node cluster with > 48 cores and 240GB RAM each. I've spawned master and the driver code in > node1, while other 14 nodes have spawned workers allocating maximum memory. > In the spark context, I am reserving 90% of total memory to executor, > splitting 60% to jvm and 40% to pyspark. > {*}Issue{*}: When I run the above program, I can see that the executors are > being assigned to the app. But it doesn't move forward, even after 60 mins. > For smaller row count (10M), this was working without a problem. Driver output > {code:java} > world sz 256 procs per worker 19 mem 216 iter 8 > Setting default log level to "WARN". > To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use > setLogLevel(newLevel). > 22/08/26 14:52:22 WARN NativeCodeLoader: Unable to load native-hadoop library > for your platform... using builtin-java classes where applicable > /N/u/d/dnperera/.conda/envs/env/lib/python3.8/site-packages/pyspark/sql/pandas/conversion.py:425: > UserWarning: createDataFrame attempted Arrow optimization because > 'spark.sql.execution.arrow.pyspark.enabled' is set to true; however, failed > by the reason below: > Negative initial size: -589934400 > Attempting non-optimization as > 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true. > warn(msg) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org