Niranda Perera created SPARK-40233:
--------------------------------------

             Summary: Unable to load large pandas dataframe to pyspark
                 Key: SPARK-40233
                 URL: https://issues.apache.org/jira/browse/SPARK-40233
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 3.3.0
            Reporter: Niranda Perera


I've been trying to join two large pandas dataframes using pyspark using the 
following code. I'm trying to vary executor cores allocated for the application 
and measure scalability of pyspark (strong scaling).
{code:java}
r = 1000000000 # 1Bn rows 
it = 10
w = 256
unique = 0.9

TOTAL_MEM = 240
TOTAL_NODES = 14

max_val = r * unique
rng = default_rng()
frame_data = rng.integers(0, max_val, size=(r, 2)) 
frame_data1 = rng.integers(0, max_val, size=(r, 2)) 
print(f"data generated", flush=True)

df_l = pd.DataFrame(frame_data).add_prefix("col")
df_r = pd.DataFrame(frame_data1).add_prefix("col")
print(f"data loaded", flush=True)


procs = int(math.ceil(w / TOTAL_NODES))
mem = int(TOTAL_MEM*0.9)
print(f"world sz {w} procs per worker {procs} mem {mem} iter {it}", flush=True)

spark = SparkSession\
    .builder\
    .appName(f'join {r} {w}')\
    .master('spark://node:7077')\
    .config('spark.executor.memory', f'{int(mem*0.6)}g')\
    .config('spark.executor.pyspark.memory', f'{int(mem*0.4)}g')\
    .config('spark.cores.max', w)\
    .config('spark.driver.memory', '100g')\
    .config('sspark.sql.execution.arrow.pyspark.enabled', 'true')\
    .getOrCreate()

sdf0 = spark.createDataFrame(df_l).repartition(w).cache()
sdf1 = spark.createDataFrame(df_r).repartition(w).cache()
print(f"data loaded to spark", flush=True)

try:           
    for i in range(it):
        t1 = time.time()
        out = sdf0.join(sdf1, on='col0', how='inner')
        count = out.count()
        t2 = time.time()
        print(f"timings {r} 1 {i} {(t2 - t1) * 1000:.0f} ms, {count}", 
flush=True)
        
        del out
        del count
        gc.collect()
finally:
    spark.stop() {code}
{*}Cluster{*}: I am using standalone spark cluster in a 15 node cluster with 48 
cores and 240GB RAM each. I've spawned master and the driver code in node1, 
while other 14 nodes have spawned workers allocating maximum memory. In the 
spark context, I am reserving 90% of total memory to executor, splitting 60% to 
jvm and 40% to pyspark.

{*}Issue{*}: When I run the above program, I can see that the executors are 
being assigned to the app. But it doesn't move forward, even after 60 mins. For 
smaller row count (10M), this was working without a problem. Driver output
{code:java}
world sz 256 procs per worker 19 mem 216 iter 8
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
setLogLevel(newLevel).
22/08/26 14:52:22 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
/N/u/d/dnperera/.conda/envs/env/lib/python3.8/site-packages/pyspark/sql/pandas/conversion.py:425:
 UserWarning: createDataFrame attempted Arrow optimization because 
'spark.sql.execution.arrow.pyspark.enabled' is set to true; however, failed by 
the reason below:
  Negative initial size: -589934400
Attempting non-optimization as 
'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to