[ https://issues.apache.org/jira/browse/SPARK-37321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Izek Greenfield updated SPARK-37321: ------------------------------------ Component/s: SQL > Wrong size estimation leads to "Cannot broadcast the table that is larger > than 8GB: 8 GB" > ----------------------------------------------------------------------------------------- > > Key: SPARK-37321 > URL: https://issues.apache.org/jira/browse/SPARK-37321 > Project: Spark > Issue Type: Bug > Components: Optimizer, SQL > Affects Versions: 3.1.1, 3.2.0 > Reporter: Izek Greenfield > Priority: Major > > When CBO is enabled then a situation occurs where spark tries to broadcast > very large DataFrame due to wrong output size estimation. > > In `EstimationUtils.getSizePerRow`, if there is no statistics then spark will > use `DataType.defaultSize`. > In the case where the output contains `functions.concat_ws`, the > `getSizePerRow` function will estimate the size to be 20 bytes, while in our > case the actual size can be a lot larger. > As a result, we in some cases end up with an estimated size of < 300K while > the actual size can be > 8GB, thus leading to exceptions as spark thinks the > tables may be broadcast but later realizes the data size is too large. > > Code sample to reproduce: > {code:scala} > import spark.implicits._ > (1 to 100000).toDF("index").withColumn("index", > col("index").cast("string")).write.parquet("/tmp/a") > (1 to 1000).toDF("index_b").withColumn("index_b", > col("index_b").cast("string")).write.parquet("/tmp/b") > val a = spark.read > .parquet("/tmp/a") > .withColumn("b", col("index")) > .withColumn("l1", functions.concat_ws("/", col("index"), > functions.current_date(), functions.current_date(), functions.current_date(), > functions.current_date())) > .withColumn("l2", functions.concat_ws("/", col("index"), > functions.current_date(), functions.current_date(), functions.current_date(), > functions.current_date())) > .withColumn("l3", functions.concat_ws("/", col("index"), > functions.current_date(), functions.current_date(), functions.current_date(), > functions.current_date())) > .withColumn("l4", functions.concat_ws("/", col("index"), > functions.current_date(), functions.current_date(), functions.current_date(), > functions.current_date())) > .withColumn("l5", functions.concat_ws("/", col("index"), > functions.current_date(), functions.current_date(), functions.current_date(), > functions.current_date())) > val r = Random.alphanumeric > val l = 220 > val i = 2800 > val b = spark.read > .parquet("/tmp/b") > .withColumn("l1", functions.concat_ws("/", (0 to i).flatMap(a => > List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*)) > .withColumn("l2", functions.concat_ws("/", (0 to i).flatMap(a => > List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*)) > .withColumn("l3", functions.concat_ws("/", (0 to i).flatMap(a => > List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*)) > .withColumn("l4", functions.concat_ws("/", (0 to i).flatMap(a => > List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*)) > .withColumn("l5", functions.concat_ws("/", (0 to i).flatMap(a => > List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*)) > .withColumn("l6", functions.concat_ws("/", (0 to i).flatMap(a => > List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*)) > .withColumn("l7", functions.concat_ws("/", (0 to i).flatMap(a => > List(col("index_b"), lit(r.take(l).mkString), lit(r.take(l).mkString))): _*)) > > a.join(b, col("index") === col("index_b")).show(2000) > {code} > -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org