Repository: spark
Updated Branches:
  refs/heads/master 61e48f52d -> e635cbb6e


[SPARK-18801][SQL][FOLLOWUP] Alias the view with its child

## What changes were proposed in this pull request?

This PR is a follow-up to address the comments 
https://github.com/apache/spark/pull/16233/files#r95669988 and 
https://github.com/apache/spark/pull/16233/files#r95662299.

We try to wrap the child by:
1. Generate the `queryOutput` by:
    1.1. If the query column names are defined, map the column names to 
attributes in the child output by name;
    1.2. Else set the child output attributes to `queryOutput`.
2. Map the `queryQutput` to view output by index, if the corresponding 
attributes don't match, try to up cast and alias the attribute in `queryOutput` 
to the attribute in the view output.
3. Add a Project over the child, with the new output generated by the previous 
steps.
If the view output doesn't have the same number of columns neither with the 
child output, nor with the query column names, throw an AnalysisException.

## How was this patch tested?

Add new test cases in `SQLViewSuite`.

Author: jiangxingbo <jiangxb1...@gmail.com>

Closes #16561 from jiangxb1987/alias-view.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e635cbb6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e635cbb6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e635cbb6

Branch: refs/heads/master
Commit: e635cbb6e61dee450db0ef45f3d82ac282a85d3c
Parents: 61e48f5
Author: jiangxingbo <jiangxb1...@gmail.com>
Authored: Mon Jan 16 19:11:21 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Mon Jan 16 19:11:21 2017 +0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  |  24 +---
 .../spark/sql/catalyst/analysis/view.scala      |  66 ++++++++---
 .../spark/sql/catalyst/catalog/interface.scala  |  49 +++++++-
 .../spark/sql/catalyst/expressions/Cast.scala   |  21 +++-
 .../spark/sql/hive/execution/SQLViewSuite.scala | 111 +++++++++++++++----
 5 files changed, 214 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e635cbb6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 1957df8..bd9037e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -2281,12 +2281,6 @@ class Analyzer(
         "type of the field in the target object")
     }
 
-    private def illegalNumericPrecedence(from: DataType, to: DataType): 
Boolean = {
-      val fromPrecedence = TypeCoercion.numericPrecedence.indexOf(from)
-      val toPrecedence = TypeCoercion.numericPrecedence.indexOf(to)
-      toPrecedence > 0 && fromPrecedence > toPrecedence
-    }
-
     def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
       case p if !p.childrenResolved => p
       case p if p.resolved => p
@@ -2294,19 +2288,11 @@ class Analyzer(
       case p => p transformExpressions {
         case u @ UpCast(child, _, _) if !child.resolved => u
 
-        case UpCast(child, dataType, walkedTypePath) => (child.dataType, 
dataType) match {
-          case (from: NumericType, to: DecimalType) if !to.isWiderThan(from) =>
-            fail(child, to, walkedTypePath)
-          case (from: DecimalType, to: NumericType) if !from.isTighterThan(to) 
=>
-            fail(child, to, walkedTypePath)
-          case (from, to) if illegalNumericPrecedence(from, to) =>
-            fail(child, to, walkedTypePath)
-          case (TimestampType, DateType) =>
-            fail(child, DateType, walkedTypePath)
-          case (StringType, to: NumericType) =>
-            fail(child, to, walkedTypePath)
-          case _ => Cast(child, dataType.asNullable)
-        }
+        case UpCast(child, dataType, walkedTypePath)
+          if Cast.mayTruncate(child.dataType, dataType) =>
+          fail(child, dataType, walkedTypePath)
+
+        case UpCast(child, dataType, walkedTypePath) => Cast(child, 
dataType.asNullable)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/e635cbb6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
index 737f846..a5640a6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
@@ -28,22 +28,60 @@ import org.apache.spark.sql.catalyst.rules.Rule
  */
 
 /**
- * Make sure that a view's child plan produces the view's output attributes. 
We wrap the child
- * with a Project and add an alias for each output attribute. The attributes 
are resolved by
- * name. This should be only done after the batch of Resolution, because the 
view attributes are
- * not completely resolved during the batch of Resolution.
+ * Make sure that a view's child plan produces the view's output attributes. 
We try to wrap the
+ * child by:
+ * 1. Generate the `queryOutput` by:
+ *    1.1. If the query column names are defined, map the column names to 
attributes in the child
+ *         output by name(This is mostly for handling view queries like SELECT 
* FROM ..., the
+ *         schema of the referenced table/view may change after the view has 
been created, so we
+ *         have to save the output of the query to `viewQueryColumnNames`, and 
restore them during
+ *         view resolution, in this way, we are able to get the correct view 
column ordering and
+ *         omit the extra columns that we don't require);
+ *    1.2. Else set the child output attributes to `queryOutput`.
+ * 2. Map the `queryQutput` to view output by index, if the corresponding 
attributes don't match,
+ *    try to up cast and alias the attribute in `queryOutput` to the attribute 
in the view output.
+ * 3. Add a Project over the child, with the new output generated by the 
previous steps.
+ * If the view output doesn't have the same number of columns neither with the 
child output, nor
+ * with the query column names, throw an AnalysisException.
+ *
+ * This should be only done after the batch of Resolution, because the view 
attributes are not
+ * completely resolved during the batch of Resolution.
  */
 case class AliasViewChild(conf: CatalystConf) extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-    case v @ View(_, output, child) if child.resolved =>
+    case v @ View(desc, output, child) if child.resolved && output != 
child.output =>
       val resolver = conf.resolver
-      val newOutput = output.map { attr =>
-        val originAttr = findAttributeByName(attr.name, child.output, resolver)
-        // The dataType of the output attributes may be not the same with that 
of the view output,
-        // so we should cast the attribute to the dataType of the view output 
attribute. If the
-        // cast can't perform, will throw an AnalysisException.
-        Alias(Cast(originAttr, attr.dataType), attr.name)(exprId = attr.exprId,
-          qualifier = attr.qualifier, explicitMetadata = Some(attr.metadata))
+      val queryColumnNames = desc.viewQueryColumnNames
+      val queryOutput = if (queryColumnNames.nonEmpty) {
+        // If the view output doesn't have the same number of columns with the 
query column names,
+        // throw an AnalysisException.
+        if (output.length != queryColumnNames.length) {
+          throw new AnalysisException(
+            s"The view output ${output.mkString("[", ",", "]")} doesn't have 
the same number of " +
+              s"columns with the query column names 
${queryColumnNames.mkString("[", ",", "]")}")
+        }
+        desc.viewQueryColumnNames.map { colName =>
+          findAttributeByName(colName, child.output, resolver)
+        }
+      } else {
+        // For view created before Spark 2.2.0, the view text is already fully 
qualified, the plan
+        // output is the same with the view output.
+        child.output
+      }
+      // Map the attributes in the query output to the attributes in the view 
output by index.
+      val newOutput = output.zip(queryOutput).map {
+        case (attr, originAttr) if attr != originAttr =>
+          // The dataType of the output attributes may be not the same with 
that of the view
+          // output, so we should cast the attribute to the dataType of the 
view output attribute.
+          // Will throw an AnalysisException if the cast can't perform or 
might truncate.
+          if (Cast.mayTruncate(originAttr.dataType, attr.dataType)) {
+            throw new AnalysisException(s"Cannot up cast ${originAttr.sql} 
from " +
+              s"${originAttr.dataType.simpleString} to ${attr.simpleString} as 
it may truncate\n")
+          } else {
+            Alias(Cast(originAttr, attr.dataType), attr.name)(exprId = 
attr.exprId,
+              qualifier = attr.qualifier, explicitMetadata = 
Some(attr.metadata))
+          }
+        case (_, originAttr) => originAttr
       }
       v.copy(child = Project(newOutput, child))
   }
@@ -74,7 +112,9 @@ object EliminateView extends Rule[LogicalPlan] {
     // The child should have the same output attributes with the View 
operator, so we simply
     // remove the View operator.
     case View(_, output, child) =>
-      assert(output == child.output, "The output of the child is different 
from the view output")
+      assert(output == child.output,
+        s"The output of the child ${child.output.mkString("[", ",", "]")} is 
different from the " +
+          s"view output ${output.mkString("[", ",", "]")}")
       child
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e635cbb6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index a9de107..2adccdd 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -19,12 +19,15 @@ package org.apache.spark.sql.catalyst.catalog
 
 import java.util.Date
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, 
TableIdentifier}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
Cast, Literal}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
-import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.types.StructType
+
 
 
 /**
@@ -178,6 +181,8 @@ case class CatalogTable(
     unsupportedFeatures: Seq[String] = Seq.empty,
     tracksPartitionsInCatalog: Boolean = false) {
 
+  import CatalogTable._
+
   /** schema of this table's partition columns */
   def partitionSchema: StructType = StructType(schema.filter {
     c => partitionColumnNames.contains(c.name)
@@ -198,9 +203,44 @@ case class CatalogTable(
 
   /**
    * Return the default database name we use to resolve a view, should be None 
if the CatalogTable
-   * is not a View.
+   * is not a View or created by older versions of Spark(before 2.2.0).
+   */
+  def viewDefaultDatabase: Option[String] = 
properties.get(VIEW_DEFAULT_DATABASE)
+
+  /**
+   * Return the output column names of the query that creates a view, the 
column names are used to
+   * resolve a view, should be empty if the CatalogTable is not a View or 
created by older versions
+   * of Spark(before 2.2.0).
+   */
+  def viewQueryColumnNames: Seq[String] = {
+    for {
+      numCols <- properties.get(VIEW_QUERY_OUTPUT_NUM_COLUMNS).toSeq
+      index <- 0 until numCols.toInt
+    } yield properties.getOrElse(
+      s"$VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX$index",
+      throw new AnalysisException("Corrupted view query output column names in 
catalog: " +
+        s"$numCols parts expected, but part $index is missing.")
+    )
+  }
+
+  /**
+   * Insert/Update the view query output column names in `properties`.
    */
-  def viewDefaultDatabase: Option[String] = 
properties.get(CatalogTable.VIEW_DEFAULT_DATABASE)
+  def withQueryColumnNames(columns: Seq[String]): CatalogTable = {
+    val props = new mutable.HashMap[String, String]
+    if (columns.nonEmpty) {
+      props.put(VIEW_QUERY_OUTPUT_NUM_COLUMNS, columns.length.toString)
+      columns.zipWithIndex.foreach { case (colName, index) =>
+        props.put(s"$VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX$index", colName)
+      }
+    }
+
+    // We can't use `filterKeys` here, as the map returned by `filterKeys` is 
not serializable,
+    // while `CatalogTable` should be serializable.
+    copy(properties = properties.filterNot { case (key, _) =>
+      key.startsWith(VIEW_QUERY_OUTPUT_PREFIX)
+    } ++ props)
+  }
 
   /** Syntactic sugar to update a field in `storage`. */
   def withNewStorage(
@@ -254,6 +294,9 @@ case class CatalogTable(
 
 object CatalogTable {
   val VIEW_DEFAULT_DATABASE = "view.default.database"
+  val VIEW_QUERY_OUTPUT_PREFIX = "view.query.out."
+  val VIEW_QUERY_OUTPUT_NUM_COLUMNS = VIEW_QUERY_OUTPUT_PREFIX + "numCols"
+  val VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX = VIEW_QUERY_OUTPUT_PREFIX + "col."
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/e635cbb6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index 14e275b..ad59271 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -21,7 +21,7 @@ import java.math.{BigDecimal => JavaBigDecimal}
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion}
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.types._
@@ -89,6 +89,25 @@ object Cast {
     case _ => false
   }
 
+  /**
+   * Return true iff we may truncate during casting `from` type to `to` type. 
e.g. long -> int,
+   * timestamp -> date.
+   */
+  def mayTruncate(from: DataType, to: DataType): Boolean = (from, to) match {
+    case (from: NumericType, to: DecimalType) if !to.isWiderThan(from) => true
+    case (from: DecimalType, to: NumericType) if !from.isTighterThan(to) => 
true
+    case (from, to) if illegalNumericPrecedence(from, to) => true
+    case (TimestampType, DateType) => true
+    case (StringType, to: NumericType) => true
+    case _ => false
+  }
+
+  private def illegalNumericPrecedence(from: DataType, to: DataType): Boolean 
= {
+    val fromPrecedence = TypeCoercion.numericPrecedence.indexOf(from)
+    val toPrecedence = TypeCoercion.numericPrecedence.indexOf(to)
+    toPrecedence > 0 && fromPrecedence > toPrecedence
+  }
+
   def forceNullable(from: DataType, to: DataType): Boolean = (from, to) match {
     case (NullType, _) => true
     case (_, _) if from == to => false

http://git-wip-us.apache.org/repos/asf/spark/blob/e635cbb6/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
index e06d0ae..9bc078d 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
@@ -553,18 +553,24 @@ class SQLViewSuite extends QueryTest with SQLTestUtils 
with TestHiveSingleton {
           identifier = TableIdentifier("view1", Some(db)),
           tableType = CatalogTableType.VIEW,
           storage = CatalogStorageFormat.empty,
-          schema = new StructType().add("id", "int").add("id1", "int"),
+          schema = new StructType().add("x", "long").add("y", "long"),
           viewOriginalText = Some("SELECT * FROM jt"),
           viewText = Some("SELECT * FROM jt"),
-          properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default"))
+          properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default",
+            CatalogTable.VIEW_QUERY_OUTPUT_NUM_COLUMNS -> "2",
+            s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}0" -> "id",
+            s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}1" -> "id1"))
         val view2 = CatalogTable(
           identifier = TableIdentifier("view2", Some(db)),
           tableType = CatalogTableType.VIEW,
           storage = CatalogStorageFormat.empty,
-          schema = new StructType().add("id", "int").add("id1", "int"),
+          schema = new StructType().add("id", "long").add("id1", "long"),
           viewOriginalText = Some("SELECT * FROM view1"),
           viewText = Some("SELECT * FROM view1"),
-          properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> db))
+          properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> db,
+            CatalogTable.VIEW_QUERY_OUTPUT_NUM_COLUMNS -> "2",
+            s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}0" -> "x",
+            s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}1" -> "y"))
         activateDatabase(db) {
           hiveContext.sessionState.catalog.createTable(view1, ignoreIfExists = 
false)
           hiveContext.sessionState.catalog.createTable(view2, ignoreIfExists = 
false)
@@ -583,7 +589,9 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with 
TestHiveSingleton {
         schema = new StructType().add("n", "int"),
         viewOriginalText = Some("WITH w AS (SELECT 1 AS n) SELECT n FROM w"),
         viewText = Some("WITH w AS (SELECT 1 AS n) SELECT n FROM w"),
-        properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default"))
+        properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default",
+          CatalogTable.VIEW_QUERY_OUTPUT_NUM_COLUMNS -> "1",
+          s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}0" -> "n"))
       hiveContext.sessionState.catalog.createTable(cte_view, ignoreIfExists = 
false)
       checkAnswer(sql("SELECT * FROM cte_view"), Row(1))
     }
@@ -595,10 +603,13 @@ class SQLViewSuite extends QueryTest with SQLTestUtils 
with TestHiveSingleton {
         identifier = TableIdentifier("join_view"),
         tableType = CatalogTableType.VIEW,
         storage = CatalogStorageFormat.empty,
-        schema = new StructType().add("id", "int").add("id1", "int"),
+        schema = new StructType().add("id", "long").add("id1", "long"),
         viewOriginalText = Some("SELECT * FROM jt"),
         viewText = Some("SELECT * FROM jt"),
-        properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default"))
+        properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default",
+          CatalogTable.VIEW_QUERY_OUTPUT_NUM_COLUMNS -> "2",
+          s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}0" -> "id",
+          s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}1" -> "id1"))
       hiveContext.sessionState.catalog.createTable(join_view, ignoreIfExists = 
false)
       checkAnswer(
         sql("SELECT * FROM join_view t1 JOIN join_view t2 ON t1.id = t2.id 
ORDER BY t1.id"),
@@ -620,10 +631,13 @@ class SQLViewSuite extends QueryTest with SQLTestUtils 
with TestHiveSingleton {
         identifier = TableIdentifier("view1"),
         tableType = CatalogTableType.VIEW,
         storage = CatalogStorageFormat.empty,
-        schema = new StructType().add("id", "int").add("id1", "int"),
+        schema = new StructType().add("id", "long").add("id1", "long"),
         viewOriginalText = Some("SELECT * FROM invalid_db.jt"),
         viewText = Some("SELECT * FROM invalid_db.jt"),
-        properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default"))
+        properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default",
+          CatalogTable.VIEW_QUERY_OUTPUT_NUM_COLUMNS -> "2",
+          s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}0" -> "id",
+          s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}1" -> "id1"))
       hiveContext.sessionState.catalog.createTable(view1, ignoreIfExists = 
false)
       assertInvalidReference("SELECT * FROM view1")
 
@@ -632,10 +646,13 @@ class SQLViewSuite extends QueryTest with SQLTestUtils 
with TestHiveSingleton {
         identifier = TableIdentifier("view2"),
         tableType = CatalogTableType.VIEW,
         storage = CatalogStorageFormat.empty,
-        schema = new StructType().add("id", "int").add("id1", "int"),
+        schema = new StructType().add("id", "long").add("id1", "long"),
         viewOriginalText = Some("SELECT * FROM invalid_table"),
         viewText = Some("SELECT * FROM invalid_table"),
-        properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default"))
+        properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default",
+          CatalogTable.VIEW_QUERY_OUTPUT_NUM_COLUMNS -> "2",
+          s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}0" -> "id",
+          s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}1" -> "id1"))
       hiveContext.sessionState.catalog.createTable(view2, ignoreIfExists = 
false)
       assertInvalidReference("SELECT * FROM view2")
 
@@ -644,10 +661,13 @@ class SQLViewSuite extends QueryTest with SQLTestUtils 
with TestHiveSingleton {
         identifier = TableIdentifier("view3"),
         tableType = CatalogTableType.VIEW,
         storage = CatalogStorageFormat.empty,
-        schema = new StructType().add("id", "int").add("id1", "int"),
+        schema = new StructType().add("id", "long").add("id1", "long"),
         viewOriginalText = Some("SELECT * FROM view2"),
         viewText = Some("SELECT * FROM view2"),
-        properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default"))
+        properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default",
+          CatalogTable.VIEW_QUERY_OUTPUT_NUM_COLUMNS -> "2",
+          s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}0" -> "id",
+          s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}1" -> "id1"))
       hiveContext.sessionState.catalog.createTable(view3, ignoreIfExists = 
false)
       assertInvalidReference("SELECT * FROM view3")
     }
@@ -680,21 +700,70 @@ class SQLViewSuite extends QueryTest with SQLTestUtils 
with TestHiveSingleton {
     }
   }
 
-  test("correctly handle type casting between view output and child output") {
+  test("resolve a view with custom column names") {
     withTable("testTable") {
+      spark.range(1, 10).selectExpr("id", "id + 1 
id1").write.saveAsTable("testTable")
       withView("testView") {
-        spark.range(1, 
10).toDF("id1").write.format("json").saveAsTable("testTable")
-        sql("CREATE VIEW testView AS SELECT * FROM testTable")
+        val testView = CatalogTable(
+          identifier = TableIdentifier("testView"),
+          tableType = CatalogTableType.VIEW,
+          storage = CatalogStorageFormat.empty,
+          schema = new StructType().add("x", "long").add("y", "long"),
+          viewOriginalText = Some("SELECT * FROM testTable"),
+          viewText = Some("SELECT * FROM testTable"),
+          properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default",
+            CatalogTable.VIEW_QUERY_OUTPUT_NUM_COLUMNS -> "2",
+            s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}0" -> "id",
+            s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}1" -> "id1"))
+        hiveContext.sessionState.catalog.createTable(testView, ignoreIfExists 
= false)
+
+        // Correctly resolve a view with custom column names.
+        checkAnswer(sql("SELECT * FROM testView ORDER BY x"), (1 to 9).map(i 
=> Row(i, i + 1)))
+
+        // Correctly resolve a view when the referenced table schema changes.
+        spark.range(1, 10).selectExpr("id", "id + id dummy", "id + 1 id1")
+          .write.mode(SaveMode.Overwrite).saveAsTable("testTable")
+        checkAnswer(sql("SELECT * FROM testView ORDER BY x"), (1 to 9).map(i 
=> Row(i, i + 1)))
+
+        // Throw an AnalysisException if the column name is not found.
+        spark.range(1, 10).selectExpr("id", "id + 1 dummy")
+          .write.mode(SaveMode.Overwrite).saveAsTable("testTable")
+        intercept[AnalysisException](sql("SELECT * FROM testView"))
+      }
+    }
+  }
+
+  test("resolve a view when the dataTypes of referenced table columns 
changed") {
+    withTable("testTable") {
+      spark.range(1, 10).selectExpr("id", "id + 1 
id1").write.saveAsTable("testTable")
+      withView("testView") {
+        val testView = CatalogTable(
+          identifier = TableIdentifier("testView"),
+          tableType = CatalogTableType.VIEW,
+          storage = CatalogStorageFormat.empty,
+          schema = new StructType().add("id", "long").add("id1", "long"),
+          viewOriginalText = Some("SELECT * FROM testTable"),
+          viewText = Some("SELECT * FROM testTable"),
+          properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default",
+            CatalogTable.VIEW_QUERY_OUTPUT_NUM_COLUMNS -> "2",
+            s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}0" -> "id",
+            s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}1" -> "id1"))
+        hiveContext.sessionState.catalog.createTable(testView, ignoreIfExists 
= false)
 
         // Allow casting from IntegerType to LongType
-        val df = (1 until 10).map(i => i).toDF("id1")
+        val df = (1 until 10).map(i => (i, i + 1)).toDF("id", "id1")
         
df.write.format("json").mode(SaveMode.Overwrite).saveAsTable("testTable")
-        checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i 
=> Row(i)))
+        checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i 
=> Row(i, i + 1)))
 
-        // Can't cast from ArrayType to LongType, throw an AnalysisException.
-        val df2 = (1 until 10).map(i => Seq(i)).toDF("id1")
+        // Casting from DoubleType to LongType might truncate, throw an 
AnalysisException.
+        val df2 = (1 until 10).map(i => (i.toDouble, i.toDouble)).toDF("id", 
"id1")
         
df2.write.format("json").mode(SaveMode.Overwrite).saveAsTable("testTable")
-        intercept[AnalysisException](sql("SELECT * FROM testView ORDER BY 
id1"))
+        intercept[AnalysisException](sql("SELECT * FROM testView"))
+
+        // Can't cast from ArrayType to LongType, throw an AnalysisException.
+        val df3 = (1 until 10).map(i => (i, Seq(i))).toDF("id", "id1")
+        
df3.write.format("json").mode(SaveMode.Overwrite).saveAsTable("testTable")
+        intercept[AnalysisException](sql("SELECT * FROM testView"))
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to