[ 
https://issues.apache.org/jira/browse/SPARK-40233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17598550#comment-17598550
 ] 

Sean R. Owen commented on SPARK-40233:
--------------------------------------

That's what happens, right?
Spark is of course meant to read data sets directly in parallel. You don't read 
them single-node and then send them to Spark in general.

> Unable to load large pandas dataframe to pyspark
> ------------------------------------------------
>
>                 Key: SPARK-40233
>                 URL: https://issues.apache.org/jira/browse/SPARK-40233
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 3.3.0
>            Reporter: Niranda Perera
>            Priority: Major
>
> I've been trying to join two large pandas dataframes using pyspark using the 
> following code. I'm trying to vary executor cores allocated for the 
> application and measure scalability of pyspark (strong scaling).
> {code:java}
> r = 1000000000 # 1Bn rows 
> it = 10
> w = 256
> unique = 0.9
> TOTAL_MEM = 240
> TOTAL_NODES = 14
> max_val = r * unique
> rng = default_rng()
> frame_data = rng.integers(0, max_val, size=(r, 2)) 
> frame_data1 = rng.integers(0, max_val, size=(r, 2)) 
> print(f"data generated", flush=True)
> df_l = pd.DataFrame(frame_data).add_prefix("col")
> df_r = pd.DataFrame(frame_data1).add_prefix("col")
> print(f"data loaded", flush=True)
> procs = int(math.ceil(w / TOTAL_NODES))
> mem = int(TOTAL_MEM*0.9)
> print(f"world sz {w} procs per worker {procs} mem {mem} iter {it}", 
> flush=True)
> spark = SparkSession\
>     .builder\
>     .appName(f'join {r} {w}')\
>     .master('spark://node:7077')\
>     .config('spark.executor.memory', f'{int(mem*0.6)}g')\
>     .config('spark.executor.pyspark.memory', f'{int(mem*0.4)}g')\
>     .config('spark.cores.max', w)\
>     .config('spark.driver.memory', '100g')\
>     .config('sspark.sql.execution.arrow.pyspark.enabled', 'true')\
>     .getOrCreate()
> sdf0 = spark.createDataFrame(df_l).repartition(w).cache()
> sdf1 = spark.createDataFrame(df_r).repartition(w).cache()
> print(f"data loaded to spark", flush=True)
> try:           
>     for i in range(it):
>         t1 = time.time()
>         out = sdf0.join(sdf1, on='col0', how='inner')
>         count = out.count()
>         t2 = time.time()
>         print(f"timings {r} 1 {i} {(t2 - t1) * 1000:.0f} ms, {count}", 
> flush=True)
>         
>         del out
>         del count
>         gc.collect()
> finally:
>     spark.stop() {code}
> {*}Cluster{*}: I am using standalone spark cluster in a 15 node cluster with 
> 48 cores and 240GB RAM each. I've spawned master and the driver code in 
> node1, while other 14 nodes have spawned workers allocating maximum memory. 
> In the spark context, I am reserving 90% of total memory to executor, 
> splitting 60% to jvm and 40% to pyspark.
> {*}Issue{*}: When I run the above program, I can see that the executors are 
> being assigned to the app. But it doesn't move forward, even after 60 mins. 
> For smaller row count (10M), this was working without a problem. Driver output
> {code:java}
> world sz 256 procs per worker 19 mem 216 iter 8
> Setting default log level to "WARN".
> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
> setLogLevel(newLevel).
> 22/08/26 14:52:22 WARN NativeCodeLoader: Unable to load native-hadoop library 
> for your platform... using builtin-java classes where applicable
> /N/u/d/dnperera/.conda/envs/env/lib/python3.8/site-packages/pyspark/sql/pandas/conversion.py:425:
>  UserWarning: createDataFrame attempted Arrow optimization because 
> 'spark.sql.execution.arrow.pyspark.enabled' is set to true; however, failed 
> by the reason below:
>   Negative initial size: -589934400
> Attempting non-optimization as 
> 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
>   warn(msg) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to