Repository: spark Updated Branches: refs/heads/master b6b44853c -> 8fade8973
[SPARK-2263][SQL] Support inserting MAP<K, V> to Hive tables JIRA issue: [SPARK-2263](https://issues.apache.org/jira/browse/SPARK-2263) Map objects were not converted to Hive types before inserting into Hive tables. Author: Cheng Lian <lian.cs....@gmail.com> Closes #1205 from liancheng/spark-2263 and squashes the following commits: c7a4373 [Cheng Lian] Addressed @concretevitamin's comment 784940b [Cheng Lian] SARPK-2263: support inserting MAP<K, V> to Hive tables Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8fade897 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8fade897 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8fade897 Branch: refs/heads/master Commit: 8fade8973e5fc97f781de5344beb66b90bd6e524 Parents: b6b4485 Author: Cheng Lian <lian.cs....@gmail.com> Authored: Wed Jun 25 00:14:34 2014 -0700 Committer: Reynold Xin <r...@apache.org> Committed: Wed Jun 25 00:14:34 2014 -0700 ---------------------------------------------------------------------- .../sql/hive/execution/InsertIntoHiveTable.scala | 8 ++++++++ .../spark/sql/hive/execution/HiveQuerySuite.scala | 15 ++++++++++++--- .../spark/sql/hive/execution/HiveUdfSuite.scala | 3 --- 3 files changed, 20 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/8fade897/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 594a803..c2b0b00 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.hive.execution import scala.collection.JavaConversions._ +import java.util.{HashMap => JHashMap} + import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar} import org.apache.hadoop.hive.metastore.MetaStoreUtils import org.apache.hadoop.hive.ql.Context @@ -88,6 +90,12 @@ case class InsertIntoHiveTable( val wrappedSeq = s.map(wrap(_, oi.getListElementObjectInspector)) seqAsJavaList(wrappedSeq) + case (m: Map[_, _], oi: MapObjectInspector) => + val keyOi = oi.getMapKeyObjectInspector + val valueOi = oi.getMapValueObjectInspector + val wrappedMap = m.map { case (key, value) => wrap(key, keyOi) -> wrap(value, valueOi) } + mapAsJavaMap(wrappedMap) + case (obj, _) => obj } http://git-wip-us.apache.org/repos/asf/spark/blob/8fade897/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index d855310..9f1cd70 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -228,7 +228,7 @@ class HiveQuerySuite extends HiveComparisonTest { val fixture = List(("foo", 2), ("bar", 1), ("foo", 4), ("bar", 3)) .zipWithIndex.map {case Pair(Pair(value, attr), key) => HavingRow(key, value, attr)} TestHive.sparkContext.parallelize(fixture).registerAsTable("having_test") - val results = + val results = hql("SELECT value, max(attr) AS attr FROM having_test GROUP BY value HAVING attr > 3") .collect() .map(x => Pair(x.getString(0), x.getInt(1))) @@ -236,7 +236,7 @@ class HiveQuerySuite extends HiveComparisonTest { assert(results === Array(Pair("foo", 4))) TestHive.reset() } - + test("SPARK-2180: HAVING with non-boolean clause raises no exceptions") { hql("select key, count(*) c from src group by key having c").collect() } @@ -370,6 +370,16 @@ class HiveQuerySuite extends HiveComparisonTest { } } + test("SPARK-2263: Insert Map<K, V> values") { + hql("CREATE TABLE m(value MAP<INT, STRING>)") + hql("INSERT OVERWRITE TABLE m SELECT MAP(key, value) FROM src LIMIT 10") + hql("SELECT * FROM m").collect().zip(hql("SELECT * FROM src LIMIT 10").collect()).map { + case (Row(map: Map[Int, String]), Row(key: Int, value: String)) => + assert(map.size === 1) + assert(map.head === (key, value)) + } + } + test("parse HQL set commands") { // Adapted from its SQL counterpart. val testKey = "spark.sql.key.usedfortestonly" @@ -460,7 +470,6 @@ class HiveQuerySuite extends HiveComparisonTest { // Put tests that depend on specific Hive settings before these last two test, // since they modify /clear stuff. - } // for SPARK-2180 test http://git-wip-us.apache.org/repos/asf/spark/blob/8fade897/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala index a9e3f42..f944d01 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala @@ -122,6 +122,3 @@ class PairUdf extends GenericUDF { override def getDisplayString(p1: Array[String]): String = "" } - - -