Chungmin created SPARK-35287: -------------------------------- Summary: RemoveRedundantProjects removes non-redundant projects Key: SPARK-35287 URL: https://issues.apache.org/jira/browse/SPARK-35287 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.1.1 Reporter: Chungmin
RemoveRedundantProjects erroneously removes non-redundant projects which are required to convert rows coming from DataSourceV2ScanExec to UnsafeRow. There is a code for this case, but it only looks at the child. The bug occurs when DataSourceV2ScanExec is not a child of the project, but a descendant. The method {{isRedundant}} in {{RemoveRedundantProjects}} should be updated to account for descendants too. The original scenario requires Iceberg to reproduce the issue. In theory, it should be able to reproduce the bug with Spark SQL only, and someone more knowledgeable with Spark SQL should be able to make such a scenario. The following is my reproduction scenario (Spark 3.1.1, Iceberg 0.11.1): {code:java} import scala.collection.JavaConverters._ import org.apache.iceberg.{PartitionSpec, TableProperties} import org.apache.iceberg.hadoop.HadoopTables import org.apache.iceberg.spark.SparkSchemaUtil import org.apache.spark.sql.{DataFrame, QueryTest, SparkSession} import org.apache.spark.sql.internal.SQLConf class RemoveRedundantProjectsTest extends QueryTest { override val spark: SparkSession = SparkSession .builder() .master("local[4]") .config("spark.driver.bindAddress", "127.0.0.1") .appName(suiteName) .getOrCreate() test("RemoveRedundantProjects removes non-redundant projects") { withSQLConf( SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false", SQLConf.REMOVE_REDUNDANT_PROJECTS_ENABLED.key -> "true") { withTempDir { dir => val path = dir.getCanonicalPath val data: DataFrame = spark.range(3).toDF val table = new HadoopTables().create( SparkSchemaUtil.convert(data.schema), PartitionSpec.unpartitioned(), Map(TableProperties.WRITE_NEW_DATA_LOCATION -> path).asJava, path) data.write.format("iceberg").mode("overwrite").save(path) table.refresh() val df = spark.read.format("iceberg").load(path) val dfX = df.as("x") val dfY = df.as("y") val join = dfX.filter(dfX("id") > 0).join(dfY, "id") join.explain("extended") assert(join.count() == 2) } } } } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org