Repository: spark
Updated Branches:
  refs/heads/master d90f2cf7a -> 1d59a4162


http://git-wip-us.apache.org/repos/asf/spark/blob/1d59a416/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 97beae2..aef940a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -620,6 +620,7 @@ class DataFrameSuite extends QueryTest with SQLTestUtils {
     assert(complexData.filter(complexData("m")("1") === 1).count() == 1)
     assert(complexData.filter(complexData("s")("key") === 1).count() == 1)
     assert(complexData.filter(complexData("m")(complexData("s")("value")) === 
1).count() == 1)
+    assert(complexData.filter(complexData("a")(complexData("s")("key")) === 
1).count() == 1)
   }
 
   test("SPARK-7551: support backticks for DataFrame attribute resolution") {

http://git-wip-us.apache.org/repos/asf/spark/blob/1d59a416/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala
index 01b7c21..8a679c7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala
@@ -19,9 +19,9 @@ package org.apache.spark.sql
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.execution.SparkSqlSerializer
-
 import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, 
SpecificMutableRow}
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
 
 class RowSuite extends SparkFunSuite {
 
@@ -31,7 +31,7 @@ class RowSuite extends SparkFunSuite {
   test("create row") {
     val expected = new GenericMutableRow(4)
     expected.setInt(0, 2147483647)
-    expected.setString(1, "this is a string")
+    expected.update(1, UTF8String.fromString("this is a string"))
     expected.setBoolean(2, false)
     expected.setNullAt(3)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1d59a416/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 535011f..51fe9d9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -581,42 +581,28 @@ class SQLQuerySuite extends QueryTest with 
BeforeAndAfterAll with SQLTestUtils {
   }
 
   test("sorting") {
-    val before = sqlContext.conf.externalSortEnabled
-    sqlContext.setConf(SQLConf.EXTERNAL_SORT, false)
-    sortTest()
-    sqlContext.setConf(SQLConf.EXTERNAL_SORT, before)
+    withSQLConf(SQLConf.EXTERNAL_SORT.key -> "false") {
+      sortTest()
+    }
   }
 
   test("external sorting") {
-    val before = sqlContext.conf.externalSortEnabled
-    sqlContext.setConf(SQLConf.EXTERNAL_SORT, true)
-    sortTest()
-    sqlContext.setConf(SQLConf.EXTERNAL_SORT, before)
+    withSQLConf(SQLConf.EXTERNAL_SORT.key -> "true") {
+      sortTest()
+    }
   }
 
   test("SPARK-6927 sorting with codegen on") {
-    val externalbefore = sqlContext.conf.externalSortEnabled
-    val codegenbefore = sqlContext.conf.codegenEnabled
-    sqlContext.setConf(SQLConf.EXTERNAL_SORT, false)
-    sqlContext.setConf(SQLConf.CODEGEN_ENABLED, true)
-    try{
+    withSQLConf(SQLConf.EXTERNAL_SORT.key -> "false",
+      SQLConf.CODEGEN_ENABLED.key -> "true") {
       sortTest()
-    } finally {
-      sqlContext.setConf(SQLConf.EXTERNAL_SORT, externalbefore)
-      sqlContext.setConf(SQLConf.CODEGEN_ENABLED, codegenbefore)
     }
   }
 
   test("SPARK-6927 external sorting with codegen on") {
-    val externalbefore = sqlContext.conf.externalSortEnabled
-    val codegenbefore = sqlContext.conf.codegenEnabled
-    sqlContext.setConf(SQLConf.CODEGEN_ENABLED, true)
-    sqlContext.setConf(SQLConf.EXTERNAL_SORT, true)
-    try {
+    withSQLConf(SQLConf.EXTERNAL_SORT.key -> "true",
+      SQLConf.CODEGEN_ENABLED.key -> "true") {
       sortTest()
-    } finally {
-      sqlContext.setConf(SQLConf.EXTERNAL_SORT, externalbefore)
-      sqlContext.setConf(SQLConf.CODEGEN_ENABLED, codegenbefore)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1d59a416/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
index e340f54..bd9729c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
@@ -190,8 +190,8 @@ object TestData {
   case class ComplexData(m: Map[String, Int], s: TestData, a: Seq[Int], b: 
Boolean)
   val complexData =
     TestSQLContext.sparkContext.parallelize(
-      ComplexData(Map("1" -> 1), TestData(1, "1"), Seq(1), true)
-        :: ComplexData(Map("2" -> 2), TestData(2, "2"), Seq(2), false)
+      ComplexData(Map("1" -> 1), TestData(1, "1"), Seq(1, 1, 1), true)
+        :: ComplexData(Map("2" -> 2), TestData(2, "2"), Seq(2, 2, 2), false)
         :: Nil).toDF()
   complexData.registerTempTable("complexData")
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1d59a416/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
index 77ed4a9..f299352 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
@@ -57,7 +57,7 @@ private[sql] class MyDenseVectorUDT extends 
UserDefinedType[MyDenseVector] {
   override def deserialize(datum: Any): MyDenseVector = {
     datum match {
       case data: ArrayData =>
-        new MyDenseVector(data.toArray.map(_.asInstanceOf[Double]))
+        new MyDenseVector(data.toDoubleArray())
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1d59a416/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index 5926ef9..39d798d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -51,7 +51,7 @@ import scala.collection.JavaConversions._
  *     java.sql.Date
  *     java.sql.Timestamp
  *  Complex Types =>
- *    Map: scala.collection.immutable.Map
+ *    Map: [[org.apache.spark.sql.types.MapData]]
  *    List: [[org.apache.spark.sql.types.ArrayData]]
  *    Struct: [[org.apache.spark.sql.catalyst.InternalRow]]
  *    Union: NOT SUPPORTED YET
@@ -290,10 +290,10 @@ private[hive] trait HiveInspectors {
       DateTimeUtils.fromJavaDate(poi.getWritableConstantValue.get())
     case mi: StandardConstantMapObjectInspector =>
       // take the value from the map inspector object, rather than the input 
data
-      mi.getWritableConstantValue.map { case (k, v) =>
-        (unwrap(k, mi.getMapKeyObjectInspector),
-          unwrap(v, mi.getMapValueObjectInspector))
-      }.toMap
+      val map = mi.getWritableConstantValue
+      val keys = map.keysIterator.map(unwrap(_, 
mi.getMapKeyObjectInspector)).toArray
+      val values = map.valuesIterator.map(unwrap(_, 
mi.getMapValueObjectInspector)).toArray
+      ArrayBasedMapData(keys, values)
     case li: StandardConstantListObjectInspector =>
       // take the value from the list inspector object, rather than the input 
data
       val values = li.getWritableConstantValue
@@ -347,12 +347,14 @@ private[hive] trait HiveInspectors {
         }
         .orNull
     case mi: MapObjectInspector =>
-      Option(mi.getMap(data)).map(
-        _.map {
-          case (k, v) =>
-            (unwrap(k, mi.getMapKeyObjectInspector),
-              unwrap(v, mi.getMapValueObjectInspector))
-        }.toMap).orNull
+      val map = mi.getMap(data)
+      if (map == null) {
+        null
+      } else {
+        val keys = map.keysIterator.map(unwrap(_, 
mi.getMapKeyObjectInspector)).toArray
+        val values = map.valuesIterator.map(unwrap(_, 
mi.getMapValueObjectInspector)).toArray
+        ArrayBasedMapData(keys, values)
+      }
     // currently, hive doesn't provide the ConstantStructObjectInspector
     case si: StructObjectInspector =>
       val allRefs = si.getAllStructFieldRefs
@@ -365,7 +367,7 @@ private[hive] trait HiveInspectors {
    * Wraps with Hive types based on object inspector.
    * TODO: Consolidate all hive OI/data interface code.
    */
-  protected def wrapperFor(oi: ObjectInspector): Any => Any = oi match {
+  protected def wrapperFor(oi: ObjectInspector, dataType: DataType): Any => 
Any = oi match {
     case _: JavaHiveVarcharObjectInspector =>
       (o: Any) =>
         val s = o.asInstanceOf[UTF8String].toString
@@ -381,7 +383,10 @@ private[hive] trait HiveInspectors {
       (o: Any) => DateTimeUtils.toJavaTimestamp(o.asInstanceOf[Long])
 
     case soi: StandardStructObjectInspector =>
-      val wrappers = soi.getAllStructFieldRefs.map(ref => 
wrapperFor(ref.getFieldObjectInspector))
+      val schema = dataType.asInstanceOf[StructType]
+      val wrappers = soi.getAllStructFieldRefs.zip(schema.fields).map { case 
(ref, field) =>
+        wrapperFor(ref.getFieldObjectInspector, field.dataType)
+      }
       (o: Any) => {
         if (o != null) {
           val struct = soi.create()
@@ -395,27 +400,34 @@ private[hive] trait HiveInspectors {
       }
 
     case loi: ListObjectInspector =>
-      val wrapper = wrapperFor(loi.getListElementObjectInspector)
+      val elementType = dataType.asInstanceOf[ArrayType].elementType
+      val wrapper = wrapperFor(loi.getListElementObjectInspector, elementType)
       (o: Any) => {
         if (o != null) {
-          seqAsJavaList(o.asInstanceOf[ArrayData].toArray().map(wrapper))
+          val array = o.asInstanceOf[ArrayData]
+          val values = new java.util.ArrayList[Any](array.numElements())
+          array.foreach(elementType, (_, e) => {
+            values.add(wrapper(e))
+          })
+          values
         } else {
           null
         }
       }
 
     case moi: MapObjectInspector =>
-      // The Predef.Map is scala.collection.immutable.Map.
-      // Since the map values can be mutable, we explicitly import 
scala.collection.Map at here.
-      import scala.collection.Map
+      val mt = dataType.asInstanceOf[MapType]
+      val keyWrapper = wrapperFor(moi.getMapKeyObjectInspector, mt.keyType)
+      val valueWrapper = wrapperFor(moi.getMapValueObjectInspector, 
mt.valueType)
 
-      val keyWrapper = wrapperFor(moi.getMapKeyObjectInspector)
-      val valueWrapper = wrapperFor(moi.getMapValueObjectInspector)
       (o: Any) => {
         if (o != null) {
-          mapAsJavaMap(o.asInstanceOf[Map[_, _]].map { case (key, value) =>
-            keyWrapper(key) -> valueWrapper(value)
+          val map = o.asInstanceOf[MapData]
+          val jmap = new java.util.HashMap[Any, Any](map.numElements())
+          map.foreach(mt.keyType, mt.valueType, (k, v) => {
+            jmap.put(keyWrapper(k), valueWrapper(v))
           })
+          jmap
         } else {
           null
         }
@@ -531,18 +543,21 @@ private[hive] trait HiveInspectors {
     case x: ListObjectInspector =>
       val list = new java.util.ArrayList[Object]
       val tpe = dataType.asInstanceOf[ArrayType].elementType
-      a.asInstanceOf[ArrayData].toArray().foreach {
-        v => list.add(wrap(v, x.getListElementObjectInspector, tpe))
-      }
+      a.asInstanceOf[ArrayData].foreach(tpe, (_, e) => {
+        list.add(wrap(e, x.getListElementObjectInspector, tpe))
+      })
       list
     case x: MapObjectInspector =>
       val keyType = dataType.asInstanceOf[MapType].keyType
       val valueType = dataType.asInstanceOf[MapType].valueType
+      val map = a.asInstanceOf[MapData]
+
       // Some UDFs seem to assume we pass in a HashMap.
-      val hashMap = new java.util.HashMap[AnyRef, AnyRef]()
-      hashMap.putAll(a.asInstanceOf[Map[_, _]].map { case (k, v) =>
-        wrap(k, x.getMapKeyObjectInspector, keyType) ->
-          wrap(v, x.getMapValueObjectInspector, valueType)
+      val hashMap = new java.util.HashMap[Any, Any](map.numElements())
+
+      map.foreach(keyType, valueType, (k, v) => {
+        hashMap.put(wrap(k, x.getMapKeyObjectInspector, keyType),
+          wrap(v, x.getMapValueObjectInspector, valueType))
       })
 
       hashMap
@@ -645,8 +660,9 @@ private[hive] trait HiveInspectors {
         
ObjectInspectorFactory.getStandardConstantListObjectInspector(listObjectInspector,
 null)
       } else {
         val list = new java.util.ArrayList[Object]()
-        value.asInstanceOf[ArrayData].toArray()
-          .foreach(v => list.add(wrap(v, listObjectInspector, dt)))
+        value.asInstanceOf[ArrayData].foreach(dt, (_, e) => {
+          list.add(wrap(e, listObjectInspector, dt))
+        })
         
ObjectInspectorFactory.getStandardConstantListObjectInspector(listObjectInspector,
 list)
       }
     case Literal(value, MapType(keyType, valueType, _)) =>
@@ -655,11 +671,14 @@ private[hive] trait HiveInspectors {
       if (value == null) {
         ObjectInspectorFactory.getStandardConstantMapObjectInspector(keyOI, 
valueOI, null)
       } else {
-        val map = new java.util.HashMap[Object, Object]()
-        value.asInstanceOf[Map[_, _]].foreach (entry => {
-          map.put(wrap(entry._1, keyOI, keyType), wrap(entry._2, valueOI, 
valueType))
+        val map = value.asInstanceOf[MapData]
+        val jmap = new java.util.HashMap[Any, Any](map.numElements())
+
+        map.foreach(keyType, valueType, (k, v) => {
+          jmap.put(wrap(k, keyOI, keyType), wrap(v, valueOI, valueType))
         })
-        ObjectInspectorFactory.getStandardConstantMapObjectInspector(keyOI, 
valueOI, map)
+
+        ObjectInspectorFactory.getStandardConstantMapObjectInspector(keyOI, 
valueOI, jmap)
       }
     // We will enumerate all of the possible constant expressions, throw 
exception if we missed
     case Literal(_, dt) => sys.error(s"Hive doesn't support the constant type 
[$dt].")

http://git-wip-us.apache.org/repos/asf/spark/blob/1d59a416/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index e4944ca..40a6a32 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -95,9 +95,9 @@ case class InsertIntoHiveTable(
         .asInstanceOf[StructObjectInspector]
 
       val fieldOIs = 
standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray
-      val wrappers = fieldOIs.map(wrapperFor)
-      val outputData = new Array[Any](fieldOIs.length)
       val dataTypes: Array[DataType] = child.output.map(_.dataType).toArray
+      val wrappers = fieldOIs.zip(dataTypes).map { case (f, dt) => 
wrapperFor(f, dt)}
+      val outputData = new Array[Any](fieldOIs.length)
 
       writerContainer.executorSideSetup(context.stageId, context.partitionId, 
context.attemptNumber)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1d59a416/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
index 4a13022..abe5c69 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
@@ -428,10 +428,10 @@ private[hive] case class HiveWindowFunction(
       // if pivotResult is false, we will get a single value for all rows in 
the frame.
       outputBuffer
     } else {
-      // if pivotResult is true, we will get a Seq having the same size with 
the size
+      // if pivotResult is true, we will get a ArrayData having the same size 
with the size
       // of the window frame. At here, we will return the result at the 
position of
       // index in the output buffer.
-      outputBuffer.asInstanceOf[ArrayData].get(index)
+      outputBuffer.asInstanceOf[ArrayData].get(index, dataType)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1d59a416/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 924f4d3..6fa5997 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -95,9 +95,10 @@ private[orc] class OrcOutputWriter(
   private val reusableOutputBuffer = new Array[Any](dataSchema.length)
 
   // Used to convert Catalyst values into Hadoop `Writable`s.
-  private val wrappers = structOI.getAllStructFieldRefs.map { ref =>
-    wrapperFor(ref.getFieldObjectInspector)
-  }.toArray
+  private val wrappers = 
structOI.getAllStructFieldRefs.zip(dataSchema.fields.map(_.dataType))
+    .map { case (ref, dt) =>
+      wrapperFor(ref.getFieldObjectInspector, dt)
+    }.toArray
 
   // `OrcRecordWriter.close()` creates an empty file if no rows are written at 
all.  We use this
   // flag to decide whether `OrcRecordWriter.close()` needs to be called.

http://git-wip-us.apache.org/repos/asf/spark/blob/1d59a416/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala
index f719f2e..99e95fb 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala
@@ -147,6 +147,8 @@ class HiveInspectorSuite extends SparkFunSuite with 
HiveInspectors {
       case (r1: Array[Byte], r2: Array[Byte])
         if r1 != null && r2 != null && r1.length == r2.length =>
         r1.zip(r2).foreach { case (b1, b2) => assert(b1 === b2) }
+      // We don't support equality & ordering for map type, so skip it.
+      case (r1: MapData, r2: MapData) =>
       case (r1, r2) => assert(r1 === r2)
     }
   }
@@ -230,7 +232,7 @@ class HiveInspectorSuite extends SparkFunSuite with 
HiveInspectors {
   test("wrap / unwrap Map Type") {
     val dt = MapType(dataTypes(0), dataTypes(1))
 
-    val d = Map(row(0) -> row(1))
+    val d = ArrayBasedMapData(Array(row(0)), Array(row(1)))
     checkValue(d, unwrap(wrap(d, toInspector(dt), dt), toInspector(dt)))
     checkValue(null, unwrap(wrap(null, toInspector(dt), dt), toInspector(dt)))
     checkValue(d,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to