Repository: spark Updated Branches: refs/heads/branch-1.0 9d0fae936 -> 6db0d5cfe
Avoid dynamic dispatching when unwrapping Hive data. This is a follow up of PR #758. The `unwrapHiveData` function is now composed statically before actual rows are scanned according to the field object inspector to avoid dynamic dispatching cost. According to the same micro benchmark used in PR #758, this simple change brings slight performance boost: 2.5% for CSV table and 1% for RCFile table. ``` Optimized version: CSV: 6870 ms, RCFile: 5687 ms CSV: 6832 ms, RCFile: 5800 ms CSV: 6822 ms, RCFile: 5679 ms CSV: 6704 ms, RCFile: 5758 ms CSV: 6819 ms, RCFile: 5725 ms Original version: CSV: 7042 ms, RCFile: 5667 ms CSV: 6883 ms, RCFile: 5703 ms CSV: 7115 ms, RCFile: 5665 ms CSV: 7020 ms, RCFile: 5981 ms CSV: 6871 ms, RCFile: 5906 ms ``` Author: Cheng Lian <lian.cs....@gmail.com> Closes #935 from liancheng/staticUnwrapping and squashes the following commits: c49c70c [Cheng Lian] Avoid dynamic dispatching when unwrapping Hive data. (cherry picked from commit 862283e9ccace6824880aa4e161723fb3248d438) Signed-off-by: Michael Armbrust <mich...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6db0d5cf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6db0d5cf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6db0d5cf Branch: refs/heads/branch-1.0 Commit: 6db0d5cfeed170b902e46fa2b83fc846ebc0d044 Parents: 9d0fae9 Author: Cheng Lian <lian.cs....@gmail.com> Authored: Mon Jun 2 19:20:23 2014 -0700 Committer: Michael Armbrust <mich...@databricks.com> Committed: Mon Jun 2 19:20:39 2014 -0700 ---------------------------------------------------------------------- .../apache/spark/sql/hive/hiveOperators.scala | 28 +++++++++++--------- .../sql/hive/execution/HiveComparisonTest.scala | 5 ++-- 2 files changed, 18 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/6db0d5cf/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala index d263c31..29b4b9b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala @@ -26,8 +26,7 @@ import org.apache.hadoop.hive.ql.plan.{TableDesc, FileSinkDesc} import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption import org.apache.hadoop.hive.serde2.objectinspector._ -import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveDecimalObjectInspector -import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveVarcharObjectInspector +import org.apache.hadoop.hive.serde2.objectinspector.primitive._ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils import org.apache.hadoop.hive.serde2.{ColumnProjectionUtils, Serializer} import org.apache.hadoop.io.Writable @@ -95,29 +94,34 @@ case class HiveTableScan( attributes.map { a => val ordinal = relation.partitionKeys.indexOf(a) if (ordinal >= 0) { + val dataType = relation.partitionKeys(ordinal).dataType (_: Any, partitionKeys: Array[String]) => { - val value = partitionKeys(ordinal) - val dataType = relation.partitionKeys(ordinal).dataType - unwrapHiveData(castFromString(value, dataType)) + castFromString(partitionKeys(ordinal), dataType) } } else { val ref = objectInspector.getAllStructFieldRefs .find(_.getFieldName == a.name) .getOrElse(sys.error(s"Can't find attribute $a")) + val fieldObjectInspector = ref.getFieldObjectInspector + + val unwrapHiveData = fieldObjectInspector match { + case _: HiveVarcharObjectInspector => + (value: Any) => value.asInstanceOf[HiveVarchar].getValue + case _: HiveDecimalObjectInspector => + (value: Any) => BigDecimal(value.asInstanceOf[HiveDecimal].bigDecimalValue()) + case _ => + identity[Any] _ + } + (row: Any, _: Array[String]) => { val data = objectInspector.getStructFieldData(row, ref) - unwrapHiveData(unwrapData(data, ref.getFieldObjectInspector)) + val hiveData = unwrapData(data, fieldObjectInspector) + if (hiveData != null) unwrapHiveData(hiveData) else null } } } } - private def unwrapHiveData(value: Any) = value match { - case varchar: HiveVarchar => varchar.getValue - case decimal: HiveDecimal => BigDecimal(decimal.bigDecimalValue) - case other => other - } - private def castFromString(value: String, dataType: DataType) = { Cast(Literal(value), dataType).eval(null) } http://git-wip-us.apache.org/repos/asf/spark/blob/6db0d5cf/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index 1b5a132..0f95410 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -133,15 +133,14 @@ abstract class HiveComparisonTest def isSorted(plan: LogicalPlan): Boolean = plan match { case _: Join | _: Aggregate | _: BaseRelation | _: Generate | _: Sample | _: Distinct => false case PhysicalOperation(_, _, Sort(_, _)) => true - case _ => plan.children.iterator.map(isSorted).exists(_ == true) + case _ => plan.children.iterator.exists(isSorted) } val orderedAnswer = hiveQuery.logical match { // Clean out non-deterministic time schema info. case _: NativeCommand => answer.filterNot(nonDeterministicLine).filterNot(_ == "") case _: ExplainCommand => answer - case plan if isSorted(plan) => answer - case _ => answer.sorted + case plan => if (isSorted(plan)) answer else answer.sorted } orderedAnswer.map(cleanPaths) }