Repository: spark
Updated Branches:
  refs/heads/branch-1.0 9d0fae936 -> 6db0d5cfe


Avoid dynamic dispatching when unwrapping Hive data.

This is a follow up of PR #758.

The `unwrapHiveData` function is now composed statically before actual rows are 
scanned according to the field object inspector to avoid dynamic dispatching 
cost.

According to the same micro benchmark used in PR #758, this simple change 
brings slight performance boost: 2.5% for CSV table and 1% for RCFile table.

```
Optimized version:

CSV: 6870 ms, RCFile: 5687 ms
CSV: 6832 ms, RCFile: 5800 ms
CSV: 6822 ms, RCFile: 5679 ms
CSV: 6704 ms, RCFile: 5758 ms
CSV: 6819 ms, RCFile: 5725 ms

Original version:

CSV: 7042 ms, RCFile: 5667 ms
CSV: 6883 ms, RCFile: 5703 ms
CSV: 7115 ms, RCFile: 5665 ms
CSV: 7020 ms, RCFile: 5981 ms
CSV: 6871 ms, RCFile: 5906 ms
```

Author: Cheng Lian <lian.cs....@gmail.com>

Closes #935 from liancheng/staticUnwrapping and squashes the following commits:

c49c70c [Cheng Lian] Avoid dynamic dispatching when unwrapping Hive data.

(cherry picked from commit 862283e9ccace6824880aa4e161723fb3248d438)
Signed-off-by: Michael Armbrust <mich...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6db0d5cf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6db0d5cf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6db0d5cf

Branch: refs/heads/branch-1.0
Commit: 6db0d5cfeed170b902e46fa2b83fc846ebc0d044
Parents: 9d0fae9
Author: Cheng Lian <lian.cs....@gmail.com>
Authored: Mon Jun 2 19:20:23 2014 -0700
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Mon Jun 2 19:20:39 2014 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/hive/hiveOperators.scala   | 28 +++++++++++---------
 .../sql/hive/execution/HiveComparisonTest.scala |  5 ++--
 2 files changed, 18 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6db0d5cf/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala
index d263c31..29b4b9b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala
@@ -26,8 +26,7 @@ import org.apache.hadoop.hive.ql.plan.{TableDesc, 
FileSinkDesc}
 import org.apache.hadoop.hive.serde.serdeConstants
 import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
 import org.apache.hadoop.hive.serde2.objectinspector._
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveDecimalObjectInspector
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveVarcharObjectInspector
+import org.apache.hadoop.hive.serde2.objectinspector.primitive._
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
 import org.apache.hadoop.hive.serde2.{ColumnProjectionUtils, Serializer}
 import org.apache.hadoop.io.Writable
@@ -95,29 +94,34 @@ case class HiveTableScan(
     attributes.map { a =>
       val ordinal = relation.partitionKeys.indexOf(a)
       if (ordinal >= 0) {
+        val dataType = relation.partitionKeys(ordinal).dataType
         (_: Any, partitionKeys: Array[String]) => {
-          val value = partitionKeys(ordinal)
-          val dataType = relation.partitionKeys(ordinal).dataType
-          unwrapHiveData(castFromString(value, dataType))
+          castFromString(partitionKeys(ordinal), dataType)
         }
       } else {
         val ref = objectInspector.getAllStructFieldRefs
           .find(_.getFieldName == a.name)
           .getOrElse(sys.error(s"Can't find attribute $a"))
+        val fieldObjectInspector = ref.getFieldObjectInspector
+
+        val unwrapHiveData = fieldObjectInspector match {
+          case _: HiveVarcharObjectInspector =>
+            (value: Any) => value.asInstanceOf[HiveVarchar].getValue
+          case _: HiveDecimalObjectInspector =>
+            (value: Any) => 
BigDecimal(value.asInstanceOf[HiveDecimal].bigDecimalValue())
+          case _ =>
+            identity[Any] _
+        }
+
         (row: Any, _: Array[String]) => {
           val data = objectInspector.getStructFieldData(row, ref)
-          unwrapHiveData(unwrapData(data, ref.getFieldObjectInspector))
+          val hiveData = unwrapData(data, fieldObjectInspector)
+          if (hiveData != null) unwrapHiveData(hiveData) else null
         }
       }
     }
   }
 
-  private def unwrapHiveData(value: Any) = value match {
-    case varchar: HiveVarchar => varchar.getValue
-    case decimal: HiveDecimal => BigDecimal(decimal.bigDecimalValue)
-    case other => other
-  }
-
   private def castFromString(value: String, dataType: DataType) = {
     Cast(Literal(value), dataType).eval(null)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/6db0d5cf/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index 1b5a132..0f95410 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -133,15 +133,14 @@ abstract class HiveComparisonTest
     def isSorted(plan: LogicalPlan): Boolean = plan match {
       case _: Join | _: Aggregate | _: BaseRelation | _: Generate | _: Sample 
| _: Distinct => false
       case PhysicalOperation(_, _, Sort(_, _)) => true
-      case _ => plan.children.iterator.map(isSorted).exists(_ == true)
+      case _ => plan.children.iterator.exists(isSorted)
     }
 
     val orderedAnswer = hiveQuery.logical match {
       // Clean out non-deterministic time schema info.
       case _: NativeCommand => 
answer.filterNot(nonDeterministicLine).filterNot(_ == "")
       case _: ExplainCommand => answer
-      case plan if isSorted(plan) => answer
-      case _ => answer.sorted
+      case plan => if (isSorted(plan)) answer else answer.sorted
     }
     orderedAnswer.map(cleanPaths)
   }

Reply via email to