Repository: spark Updated Branches: refs/heads/master d90f2cf7a -> 1d59a4162
http://git-wip-us.apache.org/repos/asf/spark/blob/1d59a416/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 97beae2..aef940a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -620,6 +620,7 @@ class DataFrameSuite extends QueryTest with SQLTestUtils { assert(complexData.filter(complexData("m")("1") === 1).count() == 1) assert(complexData.filter(complexData("s")("key") === 1).count() == 1) assert(complexData.filter(complexData("m")(complexData("s")("value")) === 1).count() == 1) + assert(complexData.filter(complexData("a")(complexData("s")("key")) === 1).count() == 1) } test("SPARK-7551: support backticks for DataFrame attribute resolution") { http://git-wip-us.apache.org/repos/asf/spark/blob/1d59a416/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala index 01b7c21..8a679c7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala @@ -19,9 +19,9 @@ package org.apache.spark.sql import org.apache.spark.SparkFunSuite import org.apache.spark.sql.execution.SparkSqlSerializer - import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, SpecificMutableRow} import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String class RowSuite extends SparkFunSuite { @@ -31,7 +31,7 @@ class RowSuite extends SparkFunSuite { test("create row") { val expected = new GenericMutableRow(4) expected.setInt(0, 2147483647) - expected.setString(1, "this is a string") + expected.update(1, UTF8String.fromString("this is a string")) expected.setBoolean(2, false) expected.setNullAt(3) http://git-wip-us.apache.org/repos/asf/spark/blob/1d59a416/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 535011f..51fe9d9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -581,42 +581,28 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils { } test("sorting") { - val before = sqlContext.conf.externalSortEnabled - sqlContext.setConf(SQLConf.EXTERNAL_SORT, false) - sortTest() - sqlContext.setConf(SQLConf.EXTERNAL_SORT, before) + withSQLConf(SQLConf.EXTERNAL_SORT.key -> "false") { + sortTest() + } } test("external sorting") { - val before = sqlContext.conf.externalSortEnabled - sqlContext.setConf(SQLConf.EXTERNAL_SORT, true) - sortTest() - sqlContext.setConf(SQLConf.EXTERNAL_SORT, before) + withSQLConf(SQLConf.EXTERNAL_SORT.key -> "true") { + sortTest() + } } test("SPARK-6927 sorting with codegen on") { - val externalbefore = sqlContext.conf.externalSortEnabled - val codegenbefore = sqlContext.conf.codegenEnabled - sqlContext.setConf(SQLConf.EXTERNAL_SORT, false) - sqlContext.setConf(SQLConf.CODEGEN_ENABLED, true) - try{ + withSQLConf(SQLConf.EXTERNAL_SORT.key -> "false", + SQLConf.CODEGEN_ENABLED.key -> "true") { sortTest() - } finally { - sqlContext.setConf(SQLConf.EXTERNAL_SORT, externalbefore) - sqlContext.setConf(SQLConf.CODEGEN_ENABLED, codegenbefore) } } test("SPARK-6927 external sorting with codegen on") { - val externalbefore = sqlContext.conf.externalSortEnabled - val codegenbefore = sqlContext.conf.codegenEnabled - sqlContext.setConf(SQLConf.CODEGEN_ENABLED, true) - sqlContext.setConf(SQLConf.EXTERNAL_SORT, true) - try { + withSQLConf(SQLConf.EXTERNAL_SORT.key -> "true", + SQLConf.CODEGEN_ENABLED.key -> "true") { sortTest() - } finally { - sqlContext.setConf(SQLConf.EXTERNAL_SORT, externalbefore) - sqlContext.setConf(SQLConf.CODEGEN_ENABLED, codegenbefore) } } http://git-wip-us.apache.org/repos/asf/spark/blob/1d59a416/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala index e340f54..bd9729c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala @@ -190,8 +190,8 @@ object TestData { case class ComplexData(m: Map[String, Int], s: TestData, a: Seq[Int], b: Boolean) val complexData = TestSQLContext.sparkContext.parallelize( - ComplexData(Map("1" -> 1), TestData(1, "1"), Seq(1), true) - :: ComplexData(Map("2" -> 2), TestData(2, "2"), Seq(2), false) + ComplexData(Map("1" -> 1), TestData(1, "1"), Seq(1, 1, 1), true) + :: ComplexData(Map("2" -> 2), TestData(2, "2"), Seq(2, 2, 2), false) :: Nil).toDF() complexData.registerTempTable("complexData") } http://git-wip-us.apache.org/repos/asf/spark/blob/1d59a416/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala index 77ed4a9..f299352 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala @@ -57,7 +57,7 @@ private[sql] class MyDenseVectorUDT extends UserDefinedType[MyDenseVector] { override def deserialize(datum: Any): MyDenseVector = { datum match { case data: ArrayData => - new MyDenseVector(data.toArray.map(_.asInstanceOf[Double])) + new MyDenseVector(data.toDoubleArray()) } } http://git-wip-us.apache.org/repos/asf/spark/blob/1d59a416/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index 5926ef9..39d798d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -51,7 +51,7 @@ import scala.collection.JavaConversions._ * java.sql.Date * java.sql.Timestamp * Complex Types => - * Map: scala.collection.immutable.Map + * Map: [[org.apache.spark.sql.types.MapData]] * List: [[org.apache.spark.sql.types.ArrayData]] * Struct: [[org.apache.spark.sql.catalyst.InternalRow]] * Union: NOT SUPPORTED YET @@ -290,10 +290,10 @@ private[hive] trait HiveInspectors { DateTimeUtils.fromJavaDate(poi.getWritableConstantValue.get()) case mi: StandardConstantMapObjectInspector => // take the value from the map inspector object, rather than the input data - mi.getWritableConstantValue.map { case (k, v) => - (unwrap(k, mi.getMapKeyObjectInspector), - unwrap(v, mi.getMapValueObjectInspector)) - }.toMap + val map = mi.getWritableConstantValue + val keys = map.keysIterator.map(unwrap(_, mi.getMapKeyObjectInspector)).toArray + val values = map.valuesIterator.map(unwrap(_, mi.getMapValueObjectInspector)).toArray + ArrayBasedMapData(keys, values) case li: StandardConstantListObjectInspector => // take the value from the list inspector object, rather than the input data val values = li.getWritableConstantValue @@ -347,12 +347,14 @@ private[hive] trait HiveInspectors { } .orNull case mi: MapObjectInspector => - Option(mi.getMap(data)).map( - _.map { - case (k, v) => - (unwrap(k, mi.getMapKeyObjectInspector), - unwrap(v, mi.getMapValueObjectInspector)) - }.toMap).orNull + val map = mi.getMap(data) + if (map == null) { + null + } else { + val keys = map.keysIterator.map(unwrap(_, mi.getMapKeyObjectInspector)).toArray + val values = map.valuesIterator.map(unwrap(_, mi.getMapValueObjectInspector)).toArray + ArrayBasedMapData(keys, values) + } // currently, hive doesn't provide the ConstantStructObjectInspector case si: StructObjectInspector => val allRefs = si.getAllStructFieldRefs @@ -365,7 +367,7 @@ private[hive] trait HiveInspectors { * Wraps with Hive types based on object inspector. * TODO: Consolidate all hive OI/data interface code. */ - protected def wrapperFor(oi: ObjectInspector): Any => Any = oi match { + protected def wrapperFor(oi: ObjectInspector, dataType: DataType): Any => Any = oi match { case _: JavaHiveVarcharObjectInspector => (o: Any) => val s = o.asInstanceOf[UTF8String].toString @@ -381,7 +383,10 @@ private[hive] trait HiveInspectors { (o: Any) => DateTimeUtils.toJavaTimestamp(o.asInstanceOf[Long]) case soi: StandardStructObjectInspector => - val wrappers = soi.getAllStructFieldRefs.map(ref => wrapperFor(ref.getFieldObjectInspector)) + val schema = dataType.asInstanceOf[StructType] + val wrappers = soi.getAllStructFieldRefs.zip(schema.fields).map { case (ref, field) => + wrapperFor(ref.getFieldObjectInspector, field.dataType) + } (o: Any) => { if (o != null) { val struct = soi.create() @@ -395,27 +400,34 @@ private[hive] trait HiveInspectors { } case loi: ListObjectInspector => - val wrapper = wrapperFor(loi.getListElementObjectInspector) + val elementType = dataType.asInstanceOf[ArrayType].elementType + val wrapper = wrapperFor(loi.getListElementObjectInspector, elementType) (o: Any) => { if (o != null) { - seqAsJavaList(o.asInstanceOf[ArrayData].toArray().map(wrapper)) + val array = o.asInstanceOf[ArrayData] + val values = new java.util.ArrayList[Any](array.numElements()) + array.foreach(elementType, (_, e) => { + values.add(wrapper(e)) + }) + values } else { null } } case moi: MapObjectInspector => - // The Predef.Map is scala.collection.immutable.Map. - // Since the map values can be mutable, we explicitly import scala.collection.Map at here. - import scala.collection.Map + val mt = dataType.asInstanceOf[MapType] + val keyWrapper = wrapperFor(moi.getMapKeyObjectInspector, mt.keyType) + val valueWrapper = wrapperFor(moi.getMapValueObjectInspector, mt.valueType) - val keyWrapper = wrapperFor(moi.getMapKeyObjectInspector) - val valueWrapper = wrapperFor(moi.getMapValueObjectInspector) (o: Any) => { if (o != null) { - mapAsJavaMap(o.asInstanceOf[Map[_, _]].map { case (key, value) => - keyWrapper(key) -> valueWrapper(value) + val map = o.asInstanceOf[MapData] + val jmap = new java.util.HashMap[Any, Any](map.numElements()) + map.foreach(mt.keyType, mt.valueType, (k, v) => { + jmap.put(keyWrapper(k), valueWrapper(v)) }) + jmap } else { null } @@ -531,18 +543,21 @@ private[hive] trait HiveInspectors { case x: ListObjectInspector => val list = new java.util.ArrayList[Object] val tpe = dataType.asInstanceOf[ArrayType].elementType - a.asInstanceOf[ArrayData].toArray().foreach { - v => list.add(wrap(v, x.getListElementObjectInspector, tpe)) - } + a.asInstanceOf[ArrayData].foreach(tpe, (_, e) => { + list.add(wrap(e, x.getListElementObjectInspector, tpe)) + }) list case x: MapObjectInspector => val keyType = dataType.asInstanceOf[MapType].keyType val valueType = dataType.asInstanceOf[MapType].valueType + val map = a.asInstanceOf[MapData] + // Some UDFs seem to assume we pass in a HashMap. - val hashMap = new java.util.HashMap[AnyRef, AnyRef]() - hashMap.putAll(a.asInstanceOf[Map[_, _]].map { case (k, v) => - wrap(k, x.getMapKeyObjectInspector, keyType) -> - wrap(v, x.getMapValueObjectInspector, valueType) + val hashMap = new java.util.HashMap[Any, Any](map.numElements()) + + map.foreach(keyType, valueType, (k, v) => { + hashMap.put(wrap(k, x.getMapKeyObjectInspector, keyType), + wrap(v, x.getMapValueObjectInspector, valueType)) }) hashMap @@ -645,8 +660,9 @@ private[hive] trait HiveInspectors { ObjectInspectorFactory.getStandardConstantListObjectInspector(listObjectInspector, null) } else { val list = new java.util.ArrayList[Object]() - value.asInstanceOf[ArrayData].toArray() - .foreach(v => list.add(wrap(v, listObjectInspector, dt))) + value.asInstanceOf[ArrayData].foreach(dt, (_, e) => { + list.add(wrap(e, listObjectInspector, dt)) + }) ObjectInspectorFactory.getStandardConstantListObjectInspector(listObjectInspector, list) } case Literal(value, MapType(keyType, valueType, _)) => @@ -655,11 +671,14 @@ private[hive] trait HiveInspectors { if (value == null) { ObjectInspectorFactory.getStandardConstantMapObjectInspector(keyOI, valueOI, null) } else { - val map = new java.util.HashMap[Object, Object]() - value.asInstanceOf[Map[_, _]].foreach (entry => { - map.put(wrap(entry._1, keyOI, keyType), wrap(entry._2, valueOI, valueType)) + val map = value.asInstanceOf[MapData] + val jmap = new java.util.HashMap[Any, Any](map.numElements()) + + map.foreach(keyType, valueType, (k, v) => { + jmap.put(wrap(k, keyOI, keyType), wrap(v, valueOI, valueType)) }) - ObjectInspectorFactory.getStandardConstantMapObjectInspector(keyOI, valueOI, map) + + ObjectInspectorFactory.getStandardConstantMapObjectInspector(keyOI, valueOI, jmap) } // We will enumerate all of the possible constant expressions, throw exception if we missed case Literal(_, dt) => sys.error(s"Hive doesn't support the constant type [$dt].") http://git-wip-us.apache.org/repos/asf/spark/blob/1d59a416/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index e4944ca..40a6a32 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -95,9 +95,9 @@ case class InsertIntoHiveTable( .asInstanceOf[StructObjectInspector] val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray - val wrappers = fieldOIs.map(wrapperFor) - val outputData = new Array[Any](fieldOIs.length) val dataTypes: Array[DataType] = child.output.map(_.dataType).toArray + val wrappers = fieldOIs.zip(dataTypes).map { case (f, dt) => wrapperFor(f, dt)} + val outputData = new Array[Any](fieldOIs.length) writerContainer.executorSideSetup(context.stageId, context.partitionId, context.attemptNumber) http://git-wip-us.apache.org/repos/asf/spark/blob/1d59a416/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index 4a13022..abe5c69 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -428,10 +428,10 @@ private[hive] case class HiveWindowFunction( // if pivotResult is false, we will get a single value for all rows in the frame. outputBuffer } else { - // if pivotResult is true, we will get a Seq having the same size with the size + // if pivotResult is true, we will get a ArrayData having the same size with the size // of the window frame. At here, we will return the result at the position of // index in the output buffer. - outputBuffer.asInstanceOf[ArrayData].get(index) + outputBuffer.asInstanceOf[ArrayData].get(index, dataType) } } http://git-wip-us.apache.org/repos/asf/spark/blob/1d59a416/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index 924f4d3..6fa5997 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -95,9 +95,10 @@ private[orc] class OrcOutputWriter( private val reusableOutputBuffer = new Array[Any](dataSchema.length) // Used to convert Catalyst values into Hadoop `Writable`s. - private val wrappers = structOI.getAllStructFieldRefs.map { ref => - wrapperFor(ref.getFieldObjectInspector) - }.toArray + private val wrappers = structOI.getAllStructFieldRefs.zip(dataSchema.fields.map(_.dataType)) + .map { case (ref, dt) => + wrapperFor(ref.getFieldObjectInspector, dt) + }.toArray // `OrcRecordWriter.close()` creates an empty file if no rows are written at all. We use this // flag to decide whether `OrcRecordWriter.close()` needs to be called. http://git-wip-us.apache.org/repos/asf/spark/blob/1d59a416/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala index f719f2e..99e95fb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala @@ -147,6 +147,8 @@ class HiveInspectorSuite extends SparkFunSuite with HiveInspectors { case (r1: Array[Byte], r2: Array[Byte]) if r1 != null && r2 != null && r1.length == r2.length => r1.zip(r2).foreach { case (b1, b2) => assert(b1 === b2) } + // We don't support equality & ordering for map type, so skip it. + case (r1: MapData, r2: MapData) => case (r1, r2) => assert(r1 === r2) } } @@ -230,7 +232,7 @@ class HiveInspectorSuite extends SparkFunSuite with HiveInspectors { test("wrap / unwrap Map Type") { val dt = MapType(dataTypes(0), dataTypes(1)) - val d = Map(row(0) -> row(1)) + val d = ArrayBasedMapData(Array(row(0)), Array(row(1))) checkValue(d, unwrap(wrap(d, toInspector(dt), dt), toInspector(dt))) checkValue(null, unwrap(wrap(null, toInspector(dt), dt), toInspector(dt))) checkValue(d, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org