Repository: spark Updated Branches: refs/heads/master 626b4cafc -> ad3cc1312
[SPARK-20245][SQL][MINOR] pass output to LogicalRelation directly ## What changes were proposed in this pull request? Currently `LogicalRelation` has a `expectedOutputAttributes` parameter, which makes it hard to reason about what the actual output is. Like other leaf nodes, `LogicalRelation` should also take `output` as a parameter, to simplify the logic ## How was this patch tested? existing tests Author: Wenchen Fan <wenc...@databricks.com> Closes #17552 from cloud-fan/minor. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ad3cc131 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ad3cc131 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ad3cc131 Branch: refs/heads/master Commit: ad3cc1312db3b5667cea134940a09896a4609b74 Parents: 626b4ca Author: Wenchen Fan <wenc...@databricks.com> Authored: Fri Apr 7 15:58:50 2017 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Fri Apr 7 15:58:50 2017 +0800 ---------------------------------------------------------------------- .../spark/sql/catalyst/catalog/interface.scala | 8 ++-- .../datasources/DataSourceStrategy.scala | 15 ++++---- .../execution/datasources/LogicalRelation.scala | 39 +++++++------------- .../datasources/PruneFileSourcePartitions.scala | 4 +- .../spark/sql/sources/PathOptionSuite.scala | 19 +++++----- .../spark/sql/hive/HiveMetastoreCatalog.scala | 13 +++++-- .../spark/sql/hive/CachedTableSuite.scala | 4 +- .../PruneFileSourcePartitionsSuite.scala | 2 +- 8 files changed, 49 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ad3cc131/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index dc2e404..360e55d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -27,7 +27,7 @@ import com.google.common.base.Objects import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Cast, Literal} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, Literal} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.catalyst.util.quoteIdentifier @@ -403,14 +403,14 @@ object CatalogTypes { */ case class CatalogRelation( tableMeta: CatalogTable, - dataCols: Seq[Attribute], - partitionCols: Seq[Attribute]) extends LeafNode with MultiInstanceRelation { + dataCols: Seq[AttributeReference], + partitionCols: Seq[AttributeReference]) extends LeafNode with MultiInstanceRelation { assert(tableMeta.identifier.database.isDefined) assert(tableMeta.partitionSchema.sameType(partitionCols.toStructType)) assert(tableMeta.dataSchema.sameType(dataCols.toStructType)) // The partition column should always appear after data columns. - override def output: Seq[Attribute] = dataCols ++ partitionCols + override def output: Seq[AttributeReference] = dataCols ++ partitionCols def isPartitioned: Boolean = partitionCols.nonEmpty http://git-wip-us.apache.org/repos/asf/spark/blob/ad3cc131/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index e5c7c38..2d83d51 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -231,16 +231,17 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] options = table.storage.properties ++ pathOption, catalogTable = Some(table)) - LogicalRelation( - dataSource.resolveRelation(checkFilesExist = false), - catalogTable = Some(table)) + LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table) } }).asInstanceOf[LogicalRelation] - // It's possible that the table schema is empty and need to be inferred at runtime. We should - // not specify expected outputs for this case. - val expectedOutputs = if (r.output.isEmpty) None else Some(r.output) - plan.copy(expectedOutputAttributes = expectedOutputs) + if (r.output.isEmpty) { + // It's possible that the table schema is empty and need to be inferred at runtime. For this + // case, we don't need to change the output of the cached plan. + plan + } else { + plan.copy(output = r.output) + } } override def apply(plan: LogicalPlan): LogicalPlan = plan transform { http://git-wip-us.apache.org/repos/asf/spark/blob/ad3cc131/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala index 3b14b79..4215203 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference} +import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.BaseRelation @@ -26,31 +26,13 @@ import org.apache.spark.util.Utils /** * Used to link a [[BaseRelation]] in to a logical query plan. - * - * Note that sometimes we need to use `LogicalRelation` to replace an existing leaf node without - * changing the output attributes' IDs. The `expectedOutputAttributes` parameter is used for - * this purpose. See https://issues.apache.org/jira/browse/SPARK-10741 for more details. */ case class LogicalRelation( relation: BaseRelation, - expectedOutputAttributes: Option[Seq[Attribute]] = None, - catalogTable: Option[CatalogTable] = None) + output: Seq[AttributeReference], + catalogTable: Option[CatalogTable]) extends LeafNode with MultiInstanceRelation { - override val output: Seq[AttributeReference] = { - val attrs = relation.schema.toAttributes - expectedOutputAttributes.map { expectedAttrs => - assert(expectedAttrs.length == attrs.length) - attrs.zip(expectedAttrs).map { - // We should respect the attribute names provided by base relation and only use the - // exprId in `expectedOutputAttributes`. - // The reason is that, some relations(like parquet) will reconcile attribute names to - // workaround case insensitivity issue. - case (attr, expected) => attr.withExprId(expected.exprId) - } - }.getOrElse(attrs) - } - // Logical Relations are distinct if they have different output for the sake of transformations. override def equals(other: Any): Boolean = other match { case l @ LogicalRelation(otherRelation, _, _) => relation == otherRelation && output == l.output @@ -87,11 +69,8 @@ case class LogicalRelation( * unique expression ids. We respect the `expectedOutputAttributes` and create * new instances of attributes in it. */ - override def newInstance(): this.type = { - LogicalRelation( - relation, - expectedOutputAttributes.map(_.map(_.newInstance())), - catalogTable).asInstanceOf[this.type] + override def newInstance(): LogicalRelation = { + this.copy(output = output.map(_.newInstance())) } override def refresh(): Unit = relation match { @@ -101,3 +80,11 @@ case class LogicalRelation( override def simpleString: String = s"Relation[${Utils.truncatedString(output, ",")}] $relation" } + +object LogicalRelation { + def apply(relation: BaseRelation): LogicalRelation = + LogicalRelation(relation, relation.schema.toAttributes, None) + + def apply(relation: BaseRelation, table: CatalogTable): LogicalRelation = + LogicalRelation(relation, relation.schema.toAttributes, Some(table)) +} http://git-wip-us.apache.org/repos/asf/spark/blob/ad3cc131/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index 8566a80..905b868 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -59,9 +59,7 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq) val prunedFsRelation = fsRelation.copy(location = prunedFileIndex)(sparkSession) - val prunedLogicalRelation = logicalRelation.copy( - relation = prunedFsRelation, - expectedOutputAttributes = Some(logicalRelation.output)) + val prunedLogicalRelation = logicalRelation.copy(relation = prunedFsRelation) // Keep partition-pruning predicates so that they are visible in physical planning val filterExpression = filters.reduceLeft(And) http://git-wip-us.apache.org/repos/asf/spark/blob/ad3cc131/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala index 60adee4..6dd4847 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala @@ -75,13 +75,13 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext { |USING ${classOf[TestOptionsSource].getCanonicalName} |OPTIONS (PATH '/tmp/path') """.stripMargin) - assert(getPathOption("src") == Some("file:/tmp/path")) + assert(getPathOption("src").map(makeQualifiedPath) == Some(makeQualifiedPath("/tmp/path"))) } // should exist even path option is not specified when creating table withTable("src") { sql(s"CREATE TABLE src(i int) USING ${classOf[TestOptionsSource].getCanonicalName}") - assert(getPathOption("src") == Some(CatalogUtils.URIToString(defaultTablePath("src")))) + assert(getPathOption("src").map(makeQualifiedPath) == Some(defaultTablePath("src"))) } } @@ -95,9 +95,9 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext { |OPTIONS (PATH '$p') |AS SELECT 1 """.stripMargin) - assert(CatalogUtils.stringToURI( - spark.table("src").schema.head.metadata.getString("path")) == - makeQualifiedPath(p.getAbsolutePath)) + assert( + spark.table("src").schema.head.metadata.getString("path") == + p.getAbsolutePath) } } @@ -109,8 +109,9 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext { |USING ${classOf[TestOptionsSource].getCanonicalName} |AS SELECT 1 """.stripMargin) - assert(spark.table("src").schema.head.metadata.getString("path") == - CatalogUtils.URIToString(defaultTablePath("src"))) + assert( + makeQualifiedPath(spark.table("src").schema.head.metadata.getString("path")) == + defaultTablePath("src")) } } @@ -122,13 +123,13 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext { |USING ${classOf[TestOptionsSource].getCanonicalName} |OPTIONS (PATH '/tmp/path')""".stripMargin) sql("ALTER TABLE src SET LOCATION '/tmp/path2'") - assert(getPathOption("src") == Some("/tmp/path2")) + assert(getPathOption("src").map(makeQualifiedPath) == Some(makeQualifiedPath("/tmp/path2"))) } withTable("src", "src2") { sql(s"CREATE TABLE src(i int) USING ${classOf[TestOptionsSource].getCanonicalName}") sql("ALTER TABLE src RENAME TO src2") - assert(getPathOption("src2") == Some(CatalogUtils.URIToString(defaultTablePath("src2")))) + assert(getPathOption("src2").map(makeQualifiedPath) == Some(defaultTablePath("src2"))) } } http://git-wip-us.apache.org/repos/asf/spark/blob/ad3cc131/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 10f4325..6b98066 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -175,7 +175,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log bucketSpec = None, fileFormat = fileFormat, options = options)(sparkSession = sparkSession) - val created = LogicalRelation(fsRelation, catalogTable = Some(updatedTable)) + val created = LogicalRelation(fsRelation, updatedTable) tableRelationCache.put(tableIdentifier, created) created } @@ -203,7 +203,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log bucketSpec = None, options = options, className = fileType).resolveRelation(), - catalogTable = Some(updatedTable)) + table = updatedTable) tableRelationCache.put(tableIdentifier, created) created @@ -212,7 +212,14 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log logicalRelation }) } - result.copy(expectedOutputAttributes = Some(relation.output)) + // The inferred schema may have different filed names as the table schema, we should respect + // it, but also respect the exprId in table relation output. + assert(result.output.length == relation.output.length && + result.output.zip(relation.output).forall { case (a1, a2) => a1.dataType == a2.dataType }) + val newOutput = result.output.zip(relation.output).map { + case (a1, a2) => a1.withExprId(a2.exprId) + } + result.copy(output = newOutput) } private def inferIfNeeded( http://git-wip-us.apache.org/repos/asf/spark/blob/ad3cc131/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index 2b3f360..d3cbf89 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -329,7 +329,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto fileFormat = new ParquetFileFormat(), options = Map.empty)(sparkSession = spark) - val plan = LogicalRelation(relation, catalogTable = Some(tableMeta)) + val plan = LogicalRelation(relation, tableMeta) spark.sharedState.cacheManager.cacheQuery(Dataset.ofRows(spark, plan)) assert(spark.sharedState.cacheManager.lookupCachedData(plan).isDefined) @@ -342,7 +342,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto bucketSpec = None, fileFormat = new ParquetFileFormat(), options = Map.empty)(sparkSession = spark) - val samePlan = LogicalRelation(sameRelation, catalogTable = Some(tableMeta)) + val samePlan = LogicalRelation(sameRelation, tableMeta) assert(spark.sharedState.cacheManager.lookupCachedData(samePlan).isDefined) } http://git-wip-us.apache.org/repos/asf/spark/blob/ad3cc131/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala index cd8f94b..f818e29 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala @@ -58,7 +58,7 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te fileFormat = new ParquetFileFormat(), options = Map.empty)(sparkSession = spark) - val logicalRelation = LogicalRelation(relation, catalogTable = Some(tableMeta)) + val logicalRelation = LogicalRelation(relation, tableMeta) val query = Project(Seq('i, 'p), Filter('p === 1, logicalRelation)).analyze val optimized = Optimize.execute(query) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org