Repository: spark
Updated Branches:
  refs/heads/master 299b43992 -> 418e5e4cb


[SPARK-10741] [SQL] Hive Query Having/OrderBy against Parquet table is not 
working

https://issues.apache.org/jira/browse/SPARK-10741
I choose the second approach: do not change output exprIds when convert 
MetastoreRelation to LogicalRelation

Author: Wenchen Fan <cloud0...@163.com>

Closes #8889 from cloud-fan/hot-bug.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/418e5e4c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/418e5e4c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/418e5e4c

Branch: refs/heads/master
Commit: 418e5e4cbdaab87addb91ac0bb2245ff0213ac81
Parents: 299b439
Author: Wenchen Fan <cloud0...@163.com>
Authored: Sun Sep 27 09:08:38 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Sun Sep 27 09:08:38 2015 -0700

----------------------------------------------------------------------
 .../analysis/MultiInstanceRelation.scala        |  1 -
 .../catalyst/expressions/namedExpressions.scala |  8 +++
 .../scala/org/apache/spark/sql/DataFrame.scala  |  2 +-
 .../datasources/DataSourceStrategy.scala        | 18 +++---
 .../execution/datasources/LogicalRelation.scala | 33 +++++++---
 .../spark/sql/execution/datasources/rules.scala | 14 ++---
 .../parquet/ParquetFilterSuite.scala            |  2 +-
 .../ParquetPartitionDiscoverySuite.scala        |  2 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 64 +++++---------------
 .../spark/sql/hive/execution/commands.scala     |  2 +-
 .../sql/hive/InsertIntoHiveTableSuite.scala     |  4 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala    |  2 +-
 .../sql/hive/execution/SQLQuerySuite.scala      | 27 ++++++++-
 .../apache/spark/sql/hive/parquetSuites.scala   |  8 +--
 .../sql/sources/hadoopFsRelationSuites.scala    |  2 +-
 15 files changed, 103 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/418e5e4c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
index 35b7402..394be47 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
-import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/418e5e4c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index 6f173b5..5768c60 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -236,6 +236,14 @@ case class AttributeReference(
     }
   }
 
+  def withExprId(newExprId: ExprId): AttributeReference = {
+    if (exprId == newExprId) {
+      this
+    } else {
+      AttributeReference(name, dataType, nullable, metadata)(newExprId, 
qualifiers)
+    }
+  }
+
   override def toString: String = s"$name#${exprId.id}$typeSuffix"
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/418e5e4c/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index f9995da..9c67ad1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1595,7 +1595,7 @@ class DataFrame private[sql](
    */
   def inputFiles: Array[String] = {
     val files: Seq[String] = logicalPlan.collect {
-      case LogicalRelation(fsBasedRelation: FileRelation) =>
+      case LogicalRelation(fsBasedRelation: FileRelation, _) =>
         fsBasedRelation.inputFiles
       case fr: FileRelation =>
         fr.inputFiles

http://git-wip-us.apache.org/repos/asf/spark/blob/418e5e4c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index c582131..918db8e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -38,21 +38,21 @@ import org.apache.spark.util.{SerializableConfiguration, 
Utils}
  */
 private[sql] object DataSourceStrategy extends Strategy with Logging {
   def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match {
-    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: 
CatalystScan)) =>
+    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: 
CatalystScan, _)) =>
       pruneFilterProjectRaw(
         l,
         projects,
         filters,
         (a, f) => toCatalystRDD(l, a, t.buildScan(a, f))) :: Nil
 
-    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: 
PrunedFilteredScan)) =>
+    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: 
PrunedFilteredScan, _)) =>
       pruneFilterProject(
         l,
         projects,
         filters,
         (a, f) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f))) 
:: Nil
 
-    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: 
PrunedScan)) =>
+    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: 
PrunedScan, _)) =>
       pruneFilterProject(
         l,
         projects,
@@ -60,7 +60,7 @@ private[sql] object DataSourceStrategy extends Strategy with 
Logging {
         (a, _) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray))) :: 
Nil
 
     // Scanning partitioned HadoopFsRelation
-    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: 
HadoopFsRelation))
+    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: 
HadoopFsRelation, _))
         if t.partitionSpec.partitionColumns.nonEmpty =>
       val selectedPartitions = prunePartitions(filters, 
t.partitionSpec).toArray
 
@@ -88,7 +88,7 @@ private[sql] object DataSourceStrategy extends Strategy with 
Logging {
         selectedPartitions) :: Nil
 
     // Scanning non-partitioned HadoopFsRelation
-    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: 
HadoopFsRelation)) =>
+    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: 
HadoopFsRelation, _)) =>
       // See buildPartitionedTableScan for the reason that we need to create a 
shard
       // broadcast HadoopConf.
       val sharedHadoopConf = SparkHadoopUtil.get.conf
@@ -101,16 +101,16 @@ private[sql] object DataSourceStrategy extends Strategy 
with Logging {
         (a, f) =>
           toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f, t.paths, 
confBroadcast))) :: Nil
 
-    case l @ LogicalRelation(baseRelation: TableScan) =>
+    case l @ LogicalRelation(baseRelation: TableScan, _) =>
       execution.PhysicalRDD.createFromDataSource(
         l.output, toCatalystRDD(l, baseRelation.buildScan()), baseRelation) :: 
Nil
 
-    case i @ logical.InsertIntoTable(
-      l @ LogicalRelation(t: InsertableRelation), part, query, overwrite, 
false) if part.isEmpty =>
+    case i @ logical.InsertIntoTable(l @ LogicalRelation(t: 
InsertableRelation, _),
+      part, query, overwrite, false) if part.isEmpty =>
       execution.ExecutedCommand(InsertIntoDataSource(l, query, overwrite)) :: 
Nil
 
     case i @ logical.InsertIntoTable(
-      l @ LogicalRelation(t: HadoopFsRelation), part, query, overwrite, false) 
=>
+      l @ LogicalRelation(t: HadoopFsRelation, _), part, query, overwrite, 
false) =>
       val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
       execution.ExecutedCommand(InsertIntoHadoopFsRelation(t, query, mode)) :: 
Nil
 

http://git-wip-us.apache.org/repos/asf/spark/blob/418e5e4c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
index a7123dc..4069179 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
@@ -17,23 +17,40 @@
 package org.apache.spark.sql.execution.datasources
 
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.{AttributeMap, 
AttributeReference}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
AttributeReference}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
 import org.apache.spark.sql.sources.BaseRelation
 
 /**
  * Used to link a [[BaseRelation]] in to a logical query plan.
+ *
+ * Note that sometimes we need to use `LogicalRelation` to replace an existing 
leaf node without
+ * changing the output attributes' IDs.  The `expectedOutputAttributes` 
parameter is used for
+ * this purpose.  See https://issues.apache.org/jira/browse/SPARK-10741 for 
more details.
  */
-private[sql] case class LogicalRelation(relation: BaseRelation)
-  extends LeafNode
-  with MultiInstanceRelation {
+private[sql] case class LogicalRelation(
+    relation: BaseRelation,
+    expectedOutputAttributes: Option[Seq[Attribute]] = None)
+  extends LeafNode with MultiInstanceRelation {
 
-  override val output: Seq[AttributeReference] = relation.schema.toAttributes
+  override val output: Seq[AttributeReference] = {
+    val attrs = relation.schema.toAttributes
+    expectedOutputAttributes.map { expectedAttrs =>
+      assert(expectedAttrs.length == attrs.length)
+      attrs.zip(expectedAttrs).map {
+        // We should respect the attribute names provided by base relation and 
only use the
+        // exprId in `expectedOutputAttributes`.
+        // The reason is that, some relations(like parquet) will reconcile 
attribute names to
+        // workaround case insensitivity issue.
+        case (attr, expected) => attr.withExprId(expected.exprId)
+      }
+    }.getOrElse(attrs)
+  }
 
   // Logical Relations are distinct if they have different output for the sake 
of transformations.
   override def equals(other: Any): Boolean = other match {
-    case l @ LogicalRelation(otherRelation) => relation == otherRelation && 
output == l.output
-    case  _ => false
+    case l @ LogicalRelation(otherRelation, _) => relation == otherRelation && 
output == l.output
+    case _ => false
   }
 
   override def hashCode: Int = {
@@ -41,7 +58,7 @@ private[sql] case class LogicalRelation(relation: 
BaseRelation)
   }
 
   override def sameResult(otherPlan: LogicalPlan): Boolean = otherPlan match {
-    case LogicalRelation(otherRelation) => relation == otherRelation
+    case LogicalRelation(otherRelation, _) => relation == otherRelation
     case _ => false
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/418e5e4c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 16c9138..8efc801 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -37,7 +37,7 @@ private[sql] object PreInsertCastAndRename extends 
Rule[LogicalPlan] {
 
       // We are inserting into an InsertableRelation or HadoopFsRelation.
       case i @ InsertIntoTable(
-      l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation), _, 
child, _, _) => {
+      l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _), _, 
child, _, _) => {
         // First, make sure the data to be inserted have the same number of 
fields with the
         // schema of the relation.
         if (l.output.size != child.output.size) {
@@ -84,14 +84,14 @@ private[sql] case class PreWriteCheck(catalog: Catalog) 
extends (LogicalPlan =>
   def apply(plan: LogicalPlan): Unit = {
     plan.foreach {
       case i @ logical.InsertIntoTable(
-        l @ LogicalRelation(t: InsertableRelation), partition, query, 
overwrite, ifNotExists) =>
+        l @ LogicalRelation(t: InsertableRelation, _), partition, query, 
overwrite, ifNotExists) =>
         // Right now, we do not support insert into a data source table with 
partition specs.
         if (partition.nonEmpty) {
           failAnalysis(s"Insert into a partition is not allowed because $l is 
not partitioned.")
         } else {
           // Get all input data source relations of the query.
           val srcRelations = query.collect {
-            case LogicalRelation(src: BaseRelation) => src
+            case LogicalRelation(src: BaseRelation, _) => src
           }
           if (srcRelations.contains(t)) {
             failAnalysis(
@@ -102,7 +102,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) 
extends (LogicalPlan =>
         }
 
       case logical.InsertIntoTable(
-        LogicalRelation(r: HadoopFsRelation), part, query, overwrite, _) =>
+        LogicalRelation(r: HadoopFsRelation, _), part, query, overwrite, _) =>
         // We need to make sure the partition columns specified by users do 
match partition
         // columns of the relation.
         val existingPartitionColumns = r.partitionColumns.fieldNames.toSet
@@ -120,7 +120,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) 
extends (LogicalPlan =>
 
         // Get all input data source relations of the query.
         val srcRelations = query.collect {
-          case LogicalRelation(src: BaseRelation) => src
+          case LogicalRelation(src: BaseRelation, _) => src
         }
         if (srcRelations.contains(r)) {
           failAnalysis(
@@ -148,10 +148,10 @@ private[sql] case class PreWriteCheck(catalog: Catalog) 
extends (LogicalPlan =>
           EliminateSubQueries(catalog.lookupRelation(tableIdent.toSeq)) match {
             // Only do the check if the table is a data source table
             // (the relation is a BaseRelation).
-            case l @ LogicalRelation(dest: BaseRelation) =>
+            case l @ LogicalRelation(dest: BaseRelation, _) =>
               // Get all input data source relations of the query.
               val srcRelations = query.collect {
-                case LogicalRelation(src: BaseRelation) => src
+                case LogicalRelation(src: BaseRelation, _) => src
               }
               if (srcRelations.contains(dest)) {
                 failAnalysis(

http://git-wip-us.apache.org/repos/asf/spark/blob/418e5e4c/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index f067112..45ad3fd 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -55,7 +55,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest 
with SharedSQLContex
         .where(Column(predicate))
 
       val analyzedPredicate = query.queryExecution.optimizedPlan.collect {
-        case PhysicalOperation(_, filters, LogicalRelation(_: 
ParquetRelation)) => filters
+        case PhysicalOperation(_, filters, LogicalRelation(_: ParquetRelation, 
_)) => filters
       }.flatten
       assert(analyzedPredicate.nonEmpty)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/418e5e4c/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 7bac860..3a23b8e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -465,7 +465,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with 
ParquetTest with Sha
       (1 to 10).map(i => (i, i.toString)).toDF("a", 
"b").write.parquet(dir.getCanonicalPath)
       val queryExecution = 
sqlContext.read.parquet(dir.getCanonicalPath).queryExecution
       queryExecution.analyzed.collectFirst {
-        case LogicalRelation(relation: ParquetRelation) =>
+        case LogicalRelation(relation: ParquetRelation, _) =>
           assert(relation.partitionSpec === PartitionSpec.emptySpec)
       }.getOrElse {
         fail(s"Expecting a ParquetRelation2, but got:\n$queryExecution")

http://git-wip-us.apache.org/repos/asf/spark/blob/418e5e4c/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 012634c..ea1521a 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -448,7 +448,7 @@ private[hive] class HiveMetastoreCatalog(val client: 
ClientInterface, hive: Hive
         partitionSpecInMetastore: Option[PartitionSpec]): 
Option[LogicalRelation] = {
       cachedDataSourceTables.getIfPresent(tableIdentifier) match {
         case null => None // Cache miss
-        case logical @ LogicalRelation(parquetRelation: ParquetRelation) =>
+        case logical @ LogicalRelation(parquetRelation: ParquetRelation, _) =>
           // If we have the same paths, same schema, and same partition spec,
           // we will use the cached Parquet Relation.
           val useCached =
@@ -514,7 +514,7 @@ private[hive] class HiveMetastoreCatalog(val client: 
ClientInterface, hive: Hive
       parquetRelation
     }
 
-    result.newInstance()
+    result.copy(expectedOutputAttributes = Some(metastoreRelation.output))
   }
 
   override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] 
= {
@@ -553,60 +553,28 @@ private[hive] class HiveMetastoreCatalog(val client: 
ClientInterface, hive: Hive
         return plan
       }
 
-      // Collects all `MetastoreRelation`s which should be replaced
-      val toBeReplaced = plan.collect {
+      plan transformUp {
         // Write path
-        case InsertIntoTable(relation: MetastoreRelation, _, _, _, _)
-            // Inserting into partitioned table is not supported in Parquet 
data source (yet).
-            if !relation.hiveQlTable.isPartitioned &&
-              hive.convertMetastoreParquet &&
-              
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
-          val parquetRelation = convertToParquetRelation(relation)
-          val attributedRewrites = relation.output.zip(parquetRelation.output)
-          (relation, parquetRelation, attributedRewrites)
+        case InsertIntoTable(r: MetastoreRelation, partition, child, 
overwrite, ifNotExists)
+          // Inserting into partitioned table is not supported in Parquet data 
source (yet).
+          if !r.hiveQlTable.isPartitioned && hive.convertMetastoreParquet &&
+            r.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
+          val parquetRelation = convertToParquetRelation(r)
+          InsertIntoTable(parquetRelation, partition, child, overwrite, 
ifNotExists)
 
         // Write path
-        case InsertIntoHiveTable(relation: MetastoreRelation, _, _, _, _)
+        case InsertIntoHiveTable(r: MetastoreRelation, partition, child, 
overwrite, ifNotExists)
           // Inserting into partitioned table is not supported in Parquet data 
source (yet).
-          if !relation.hiveQlTable.isPartitioned &&
-            hive.convertMetastoreParquet &&
-            
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
-          val parquetRelation = convertToParquetRelation(relation)
-          val attributedRewrites = relation.output.zip(parquetRelation.output)
-          (relation, parquetRelation, attributedRewrites)
+          if !r.hiveQlTable.isPartitioned && hive.convertMetastoreParquet &&
+            r.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
+          val parquetRelation = convertToParquetRelation(r)
+          InsertIntoTable(parquetRelation, partition, child, overwrite, 
ifNotExists)
 
         // Read path
         case relation: MetastoreRelation if hive.convertMetastoreParquet &&
-              
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
+          relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") 
=>
           val parquetRelation = convertToParquetRelation(relation)
-          val attributedRewrites = relation.output.zip(parquetRelation.output)
-          (relation, parquetRelation, attributedRewrites)
-      }
-
-      val relationMap = toBeReplaced.map(r => (r._1, r._2)).toMap
-      val attributedRewrites = AttributeMap(toBeReplaced.map(_._3).fold(Nil)(_ 
++: _))
-
-      // Replaces all `MetastoreRelation`s with corresponding 
`ParquetRelation2`s, and fixes
-      // attribute IDs referenced in other nodes.
-      plan.transformUp {
-        case r: MetastoreRelation if relationMap.contains(r) =>
-          val parquetRelation = relationMap(r)
-          val alias = r.alias.getOrElse(r.tableName)
-          Subquery(alias, parquetRelation)
-
-        case InsertIntoTable(r: MetastoreRelation, partition, child, 
overwrite, ifNotExists)
-          if relationMap.contains(r) =>
-          val parquetRelation = relationMap(r)
-          InsertIntoTable(parquetRelation, partition, child, overwrite, 
ifNotExists)
-
-        case InsertIntoHiveTable(r: MetastoreRelation, partition, child, 
overwrite, ifNotExists)
-          if relationMap.contains(r) =>
-          val parquetRelation = relationMap(r)
-          InsertIntoTable(parquetRelation, partition, child, overwrite, 
ifNotExists)
-
-        case other => other.transformExpressions {
-          case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a)
-        }
+          Subquery(relation.alias.getOrElse(relation.tableName), 
parquetRelation)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/418e5e4c/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index d1699dd..9f654ee 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -235,7 +235,7 @@ case class CreateMetastoreDataSourceAsSelect(
             sqlContext, Some(query.schema.asNullable), partitionColumns, 
provider, optionsWithPath)
           val createdRelation = LogicalRelation(resolved.relation)
           
EliminateSubQueries(sqlContext.catalog.lookupRelation(tableIdent.toSeq)) match {
-            case l @ LogicalRelation(_: InsertableRelation | _: 
HadoopFsRelation) =>
+            case l @ LogicalRelation(_: InsertableRelation | _: 
HadoopFsRelation, _) =>
               if (l.relation != createdRelation.relation) {
                 val errorDescription =
                   s"Cannot append to table $tableName because the resolved 
relation does not " +

http://git-wip-us.apache.org/repos/asf/spark/blob/418e5e4c/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
index 80a61f8..81ee9ba 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
@@ -81,9 +81,9 @@ class InsertIntoHiveTableSuite extends QueryTest with 
TestHiveSingleton with Bef
   test("Double create fails when allowExisting = false") {
     sql("CREATE TABLE doubleCreateAndInsertTest (key int, value string)")
 
-    val message = intercept[QueryExecutionException] {
+    intercept[QueryExecutionException] {
       sql("CREATE TABLE doubleCreateAndInsertTest (key int, value string)")
-    }.getMessage
+    }
   }
 
   test("Double create does not fail when allowExisting = true") {

http://git-wip-us.apache.org/repos/asf/spark/blob/418e5e4c/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index bf0db08..d356538 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -570,7 +570,7 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
             Row(3) :: Row(4) :: Nil)
 
           table("test_parquet_ctas").queryExecution.optimizedPlan match {
-            case LogicalRelation(p: ParquetRelation) => // OK
+            case LogicalRelation(p: ParquetRelation, _) => // OK
             case _ =>
               fail(s"test_parquet_ctas should have be converted to 
${classOf[ParquetRelation]}")
           }

http://git-wip-us.apache.org/repos/asf/spark/blob/418e5e4c/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 71823e3..8c3f9ac 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -263,7 +263,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils 
with TestHiveSingleton {
     def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = 
{
       val relation = 
EliminateSubQueries(catalog.lookupRelation(Seq(tableName)))
       relation match {
-        case LogicalRelation(r: ParquetRelation) =>
+        case LogicalRelation(r: ParquetRelation, _) =>
           if (!isDataSourceParquet) {
             fail(
               s"${classOf[MetastoreRelation].getCanonicalName} is expected, 
but found " +
@@ -1223,4 +1223,29 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils 
with TestHiveSingleton {
 
     checkAnswer(df, (0 until 5).map(i => Row(i + "#", i + "#")))
   }
+
+  test("SPARK-10741: Sort on Aggregate using parquet") {
+    withTable("test10741") {
+      withTempTable("src") {
+        Seq("a" -> 5, "a" -> 9, "b" -> 6).toDF().registerTempTable("src")
+        sql("CREATE TABLE test10741(c1 STRING, c2 INT) STORED AS PARQUET AS 
SELECT * FROM src")
+      }
+
+      checkAnswer(sql(
+        """
+          |SELECT c1, AVG(c2) AS c_avg
+          |FROM test10741
+          |GROUP BY c1
+          |HAVING (AVG(c2) > 5) ORDER BY c1
+        """.stripMargin), Row("a", 7.0) :: Row("b", 6.0) :: Nil)
+
+      checkAnswer(sql(
+        """
+          |SELECT c1, AVG(c2) AS c_avg
+          |FROM test10741
+          |GROUP BY c1
+          |ORDER BY AVG(c2)
+        """.stripMargin), Row("b", 6.0) :: Row("a", 7.0) :: Nil)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/418e5e4c/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 6842ec2..7d8104f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -282,7 +282,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest 
{
       )
 
       table("test_parquet_ctas").queryExecution.optimizedPlan match {
-        case LogicalRelation(_: ParquetRelation) => // OK
+        case LogicalRelation(_: ParquetRelation, _) => // OK
         case _ => fail(
           "test_parquet_ctas should be converted to " +
               s"${classOf[ParquetRelation].getCanonicalName }")
@@ -369,7 +369,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest 
{
 
       assertResult(2) {
         analyzed.collect {
-          case r@LogicalRelation(_: ParquetRelation) => r
+          case r @ LogicalRelation(_: ParquetRelation, _) => r
         }.size
       }
     }
@@ -378,7 +378,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest 
{
   def collectParquetRelation(df: DataFrame): ParquetRelation = {
     val plan = df.queryExecution.analyzed
     plan.collectFirst {
-      case LogicalRelation(r: ParquetRelation) => r
+      case LogicalRelation(r: ParquetRelation, _) => r
     }.getOrElse {
       fail(s"Expecting a ParquetRelation2, but got:\n$plan")
     }
@@ -428,7 +428,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest 
{
       // Converted test_parquet should be cached.
       catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) match {
         case null => fail("Converted test_parquet should be cached in the 
cache.")
-        case logical @ LogicalRelation(parquetRelation: ParquetRelation) => // 
OK
+        case logical @ LogicalRelation(parquetRelation: ParquetRelation, _) => 
// OK
         case other =>
           fail(
             "The cached test_parquet should be a Parquet Relation. " +

http://git-wip-us.apache.org/repos/asf/spark/blob/418e5e4c/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index d750493..42b9b3d 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -499,7 +499,7 @@ abstract class HadoopFsRelationTest extends QueryTest with 
SQLTestUtils with Tes
       }
 
       val actualPaths = df.queryExecution.analyzed.collectFirst {
-        case LogicalRelation(relation: HadoopFsRelation) =>
+        case LogicalRelation(relation: HadoopFsRelation, _) =>
           relation.paths.toSet
       }.getOrElse {
         fail("Expect an FSBasedRelation, but none could be found")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to