Repository: spark
Updated Branches:
  refs/heads/branch-2.2 2a9697593 -> fdea642db


[SPARK-21739][SQL] Cast expression should initialize timezoneId when it is 
called statically to convert something into TimestampType

## What changes were proposed in this pull request?

https://issues.apache.org/jira/projects/SPARK/issues/SPARK-21739

This issue is caused by introducing TimeZoneAwareExpression.
When the **Cast** expression converts something into TimestampType, it should 
be resolved with setting `timezoneId`. In general, it is resolved in 
LogicalPlan phase.

However, there are still some places that use Cast expression statically to 
convert datatypes without setting `timezoneId`. In such cases,  
`NoSuchElementException: None.get` will be thrown for TimestampType.

This PR is proposed to fix the issue. We have checked the whole project and 
found two such usages(i.e., in`TableReader` and `HiveTableScanExec`).

## How was this patch tested?

unit test

Author: donnyzone <wellfeng...@gmail.com>

Closes #18960 from DonnyZone/spark-21739.

(cherry picked from commit 310454be3b0ce5ff6b6ef0070c5daadf6fb16927)
Signed-off-by: gatorsmile <gatorsm...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fdea642d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fdea642d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fdea642d

Branch: refs/heads/branch-2.2
Commit: fdea642dbd17d74c8bf136c1746159acaa937d25
Parents: 2a96975
Author: donnyzone <wellfeng...@gmail.com>
Authored: Thu Aug 17 22:37:32 2017 -0700
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Thu Aug 17 22:37:41 2017 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/hive/TableReader.scala    |  8 ++++++--
 .../sql/hive/execution/HiveTableScanExec.scala     |  8 ++++++--
 .../spark/sql/hive/QueryPartitionSuite.scala       | 17 +++++++++++++++++
 3 files changed, 29 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fdea642d/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index 16c1103..a0e379f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -39,8 +39,10 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.CastSupport
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.{SerializableConfiguration, Utils}
 
@@ -65,7 +67,7 @@ class HadoopTableReader(
     @transient private val tableDesc: TableDesc,
     @transient private val sparkSession: SparkSession,
     hadoopConf: Configuration)
-  extends TableReader with Logging {
+  extends TableReader with CastSupport with Logging {
 
   // Hadoop honors "mapreduce.job.maps" as hint,
   // but will ignore when mapreduce.jobtracker.address is "local".
@@ -86,6 +88,8 @@ class HadoopTableReader(
   private val _broadcastedHadoopConf =
     sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
 
+  override def conf: SQLConf = sparkSession.sessionState.conf
+
   override def makeRDDForTable(hiveTable: HiveTable): RDD[InternalRow] =
     makeRDDForTable(
       hiveTable,
@@ -227,7 +231,7 @@ class HadoopTableReader(
       def fillPartitionKeys(rawPartValues: Array[String], row: InternalRow): 
Unit = {
         partitionKeyAttrs.foreach { case (attr, ordinal) =>
           val partOrdinal = partitionKeys.indexOf(attr)
-          row(ordinal) = Cast(Literal(rawPartValues(partOrdinal)), 
attr.dataType).eval(null)
+          row(ordinal) = cast(Literal(rawPartValues(partOrdinal)), 
attr.dataType).eval(null)
         }
       }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/fdea642d/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
index 75b076b..2ce8ccf 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.CastSupport
 import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
@@ -37,6 +38,7 @@ import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.hive.client.HiveClientImpl
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{BooleanType, DataType}
 import org.apache.spark.util.Utils
 
@@ -53,11 +55,13 @@ case class HiveTableScanExec(
     relation: HiveTableRelation,
     partitionPruningPred: Seq[Expression])(
     @transient private val sparkSession: SparkSession)
-  extends LeafExecNode {
+  extends LeafExecNode with CastSupport {
 
   require(partitionPruningPred.isEmpty || relation.isPartitioned,
     "Partition pruning predicates only supported for partitioned tables.")
 
+  override def conf: SQLConf = sparkSession.sessionState.conf
+
   override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"))
 
@@ -104,7 +108,7 @@ case class HiveTableScanExec(
     hadoopConf)
 
   private def castFromString(value: String, dataType: DataType) = {
-    Cast(Literal(value), dataType).eval(null)
+    cast(Literal(value), dataType).eval(null)
   }
 
   private def addColumnMetadataToConf(hiveConf: Configuration): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/fdea642d/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
index 43b6bf5..b2dc401 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.hive
 
 import java.io.File
+import java.sql.Timestamp
 
 import com.google.common.io.Files
 import org.apache.hadoop.fs.FileSystem
@@ -68,4 +69,20 @@ class QueryPartitionSuite extends QueryTest with 
SQLTestUtils with TestHiveSingl
       sql("DROP TABLE IF EXISTS createAndInsertTest")
     }
   }
+
+  test("SPARK-21739: Cast expression should initialize timezoneId") {
+    withTable("table_with_timestamp_partition") {
+      sql("CREATE TABLE table_with_timestamp_partition(value int) PARTITIONED 
BY (ts TIMESTAMP)")
+      sql("INSERT OVERWRITE TABLE table_with_timestamp_partition " +
+        "PARTITION (ts = '2010-01-01 00:00:00.000') VALUES (1)")
+
+      // test for Cast expression in TableReader
+      checkAnswer(sql("SELECT * FROM table_with_timestamp_partition"),
+        Seq(Row(1, Timestamp.valueOf("2010-01-01 00:00:00.000"))))
+
+      // test for Cast expression in HiveTableScanExec
+      checkAnswer(sql("SELECT value FROM table_with_timestamp_partition " +
+        "WHERE ts = '2010-01-01 00:00:00.000'"), Row(1))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to