http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala deleted file mode 100644 index 92022ff..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala +++ /dev/null @@ -1,1172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.json - -import java.io.{File, StringWriter} -import java.sql.{Date, Timestamp} - -import com.fasterxml.jackson.core.JsonFactory -import org.apache.spark.rdd.RDD -import org.scalactic.Tolerance._ - -import org.apache.spark.sql.{SQLContext, QueryTest, Row, SQLConf} -import org.apache.spark.sql.TestData._ -import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.execution.datasources.{ResolvedDataSource, LogicalRelation} -import org.apache.spark.sql.json.InferSchema.compatibleType -import org.apache.spark.sql.types._ -import org.apache.spark.sql.test.SQLTestUtils -import org.apache.spark.util.Utils - -class JsonSuite extends QueryTest with SQLTestUtils with TestJsonData { - - protected lazy val ctx = org.apache.spark.sql.test.TestSQLContext - override def sqlContext: SQLContext = ctx // used by SQLTestUtils - - import ctx.sql - import ctx.implicits._ - - test("Type promotion") { - def checkTypePromotion(expected: Any, actual: Any) { - assert(expected.getClass == actual.getClass, - s"Failed to promote ${actual.getClass} to ${expected.getClass}.") - assert(expected == actual, - s"Promoted value ${actual}(${actual.getClass}) does not equal the expected value " + - s"${expected}(${expected.getClass}).") - } - - val factory = new JsonFactory() - def enforceCorrectType(value: Any, dataType: DataType): Any = { - val writer = new StringWriter() - val generator = factory.createGenerator(writer) - generator.writeObject(value) - generator.flush() - - val parser = factory.createParser(writer.toString) - parser.nextToken() - JacksonParser.convertField(factory, parser, dataType) - } - - val intNumber: Int = 2147483647 - checkTypePromotion(intNumber, enforceCorrectType(intNumber, IntegerType)) - checkTypePromotion(intNumber.toLong, enforceCorrectType(intNumber, LongType)) - checkTypePromotion(intNumber.toDouble, enforceCorrectType(intNumber, DoubleType)) - checkTypePromotion( - Decimal(intNumber), enforceCorrectType(intNumber, DecimalType.SYSTEM_DEFAULT)) - - val longNumber: Long = 9223372036854775807L - checkTypePromotion(longNumber, enforceCorrectType(longNumber, LongType)) - checkTypePromotion(longNumber.toDouble, enforceCorrectType(longNumber, DoubleType)) - checkTypePromotion( - Decimal(longNumber), enforceCorrectType(longNumber, DecimalType.SYSTEM_DEFAULT)) - - val doubleNumber: Double = 1.7976931348623157E308d - checkTypePromotion(doubleNumber.toDouble, enforceCorrectType(doubleNumber, DoubleType)) - - checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(intNumber)), - enforceCorrectType(intNumber, TimestampType)) - checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(intNumber.toLong)), - enforceCorrectType(intNumber.toLong, TimestampType)) - val strTime = "2014-09-30 12:34:56" - checkTypePromotion(DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(strTime)), - enforceCorrectType(strTime, TimestampType)) - - val strDate = "2014-10-15" - checkTypePromotion( - DateTimeUtils.fromJavaDate(Date.valueOf(strDate)), enforceCorrectType(strDate, DateType)) - - val ISO8601Time1 = "1970-01-01T01:00:01.0Z" - checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(3601000)), - enforceCorrectType(ISO8601Time1, TimestampType)) - checkTypePromotion(DateTimeUtils.millisToDays(3601000), - enforceCorrectType(ISO8601Time1, DateType)) - val ISO8601Time2 = "1970-01-01T02:00:01-01:00" - checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(10801000)), - enforceCorrectType(ISO8601Time2, TimestampType)) - checkTypePromotion(DateTimeUtils.millisToDays(10801000), - enforceCorrectType(ISO8601Time2, DateType)) - } - - test("Get compatible type") { - def checkDataType(t1: DataType, t2: DataType, expected: DataType) { - var actual = compatibleType(t1, t2) - assert(actual == expected, - s"Expected $expected as the most general data type for $t1 and $t2, found $actual") - actual = compatibleType(t2, t1) - assert(actual == expected, - s"Expected $expected as the most general data type for $t1 and $t2, found $actual") - } - - // NullType - checkDataType(NullType, BooleanType, BooleanType) - checkDataType(NullType, IntegerType, IntegerType) - checkDataType(NullType, LongType, LongType) - checkDataType(NullType, DoubleType, DoubleType) - checkDataType(NullType, DecimalType.SYSTEM_DEFAULT, DecimalType.SYSTEM_DEFAULT) - checkDataType(NullType, StringType, StringType) - checkDataType(NullType, ArrayType(IntegerType), ArrayType(IntegerType)) - checkDataType(NullType, StructType(Nil), StructType(Nil)) - checkDataType(NullType, NullType, NullType) - - // BooleanType - checkDataType(BooleanType, BooleanType, BooleanType) - checkDataType(BooleanType, IntegerType, StringType) - checkDataType(BooleanType, LongType, StringType) - checkDataType(BooleanType, DoubleType, StringType) - checkDataType(BooleanType, DecimalType.SYSTEM_DEFAULT, StringType) - checkDataType(BooleanType, StringType, StringType) - checkDataType(BooleanType, ArrayType(IntegerType), StringType) - checkDataType(BooleanType, StructType(Nil), StringType) - - // IntegerType - checkDataType(IntegerType, IntegerType, IntegerType) - checkDataType(IntegerType, LongType, LongType) - checkDataType(IntegerType, DoubleType, DoubleType) - checkDataType(IntegerType, DecimalType.SYSTEM_DEFAULT, DecimalType.SYSTEM_DEFAULT) - checkDataType(IntegerType, StringType, StringType) - checkDataType(IntegerType, ArrayType(IntegerType), StringType) - checkDataType(IntegerType, StructType(Nil), StringType) - - // LongType - checkDataType(LongType, LongType, LongType) - checkDataType(LongType, DoubleType, DoubleType) - checkDataType(LongType, DecimalType.SYSTEM_DEFAULT, DecimalType.SYSTEM_DEFAULT) - checkDataType(LongType, StringType, StringType) - checkDataType(LongType, ArrayType(IntegerType), StringType) - checkDataType(LongType, StructType(Nil), StringType) - - // DoubleType - checkDataType(DoubleType, DoubleType, DoubleType) - checkDataType(DoubleType, DecimalType.SYSTEM_DEFAULT, DoubleType) - checkDataType(DoubleType, StringType, StringType) - checkDataType(DoubleType, ArrayType(IntegerType), StringType) - checkDataType(DoubleType, StructType(Nil), StringType) - - // DecimalType - checkDataType(DecimalType.SYSTEM_DEFAULT, DecimalType.SYSTEM_DEFAULT, - DecimalType.SYSTEM_DEFAULT) - checkDataType(DecimalType.SYSTEM_DEFAULT, StringType, StringType) - checkDataType(DecimalType.SYSTEM_DEFAULT, ArrayType(IntegerType), StringType) - checkDataType(DecimalType.SYSTEM_DEFAULT, StructType(Nil), StringType) - - // StringType - checkDataType(StringType, StringType, StringType) - checkDataType(StringType, ArrayType(IntegerType), StringType) - checkDataType(StringType, StructType(Nil), StringType) - - // ArrayType - checkDataType(ArrayType(IntegerType), ArrayType(IntegerType), ArrayType(IntegerType)) - checkDataType(ArrayType(IntegerType), ArrayType(LongType), ArrayType(LongType)) - checkDataType(ArrayType(IntegerType), ArrayType(StringType), ArrayType(StringType)) - checkDataType(ArrayType(IntegerType), StructType(Nil), StringType) - checkDataType( - ArrayType(IntegerType, true), ArrayType(IntegerType), ArrayType(IntegerType, true)) - checkDataType( - ArrayType(IntegerType, true), ArrayType(IntegerType, false), ArrayType(IntegerType, true)) - checkDataType( - ArrayType(IntegerType, true), ArrayType(IntegerType, true), ArrayType(IntegerType, true)) - checkDataType( - ArrayType(IntegerType, false), ArrayType(IntegerType), ArrayType(IntegerType, true)) - checkDataType( - ArrayType(IntegerType, false), ArrayType(IntegerType, false), ArrayType(IntegerType, false)) - checkDataType( - ArrayType(IntegerType, false), ArrayType(IntegerType, true), ArrayType(IntegerType, true)) - - // StructType - checkDataType(StructType(Nil), StructType(Nil), StructType(Nil)) - checkDataType( - StructType(StructField("f1", IntegerType, true) :: Nil), - StructType(StructField("f1", IntegerType, true) :: Nil), - StructType(StructField("f1", IntegerType, true) :: Nil)) - checkDataType( - StructType(StructField("f1", IntegerType, true) :: Nil), - StructType(Nil), - StructType(StructField("f1", IntegerType, true) :: Nil)) - checkDataType( - StructType( - StructField("f1", IntegerType, true) :: - StructField("f2", IntegerType, true) :: Nil), - StructType(StructField("f1", LongType, true) :: Nil) , - StructType( - StructField("f1", LongType, true) :: - StructField("f2", IntegerType, true) :: Nil)) - checkDataType( - StructType( - StructField("f1", IntegerType, true) :: Nil), - StructType( - StructField("f2", IntegerType, true) :: Nil), - StructType( - StructField("f1", IntegerType, true) :: - StructField("f2", IntegerType, true) :: Nil)) - checkDataType( - StructType( - StructField("f1", IntegerType, true) :: Nil), - DecimalType.SYSTEM_DEFAULT, - StringType) - } - - test("Complex field and type inferring with null in sampling") { - val jsonDF = ctx.read.json(jsonNullStruct) - val expectedSchema = StructType( - StructField("headers", StructType( - StructField("Charset", StringType, true) :: - StructField("Host", StringType, true) :: Nil) - , true) :: - StructField("ip", StringType, true) :: - StructField("nullstr", StringType, true):: Nil) - - assert(expectedSchema === jsonDF.schema) - jsonDF.registerTempTable("jsonTable") - - checkAnswer( - sql("select nullstr, headers.Host from jsonTable"), - Seq(Row("", "1.abc.com"), Row("", null), Row("", null), Row(null, null)) - ) - } - - test("Primitive field and type inferring") { - val jsonDF = ctx.read.json(primitiveFieldAndType) - - val expectedSchema = StructType( - StructField("bigInteger", DecimalType(20, 0), true) :: - StructField("boolean", BooleanType, true) :: - StructField("double", DoubleType, true) :: - StructField("integer", LongType, true) :: - StructField("long", LongType, true) :: - StructField("null", StringType, true) :: - StructField("string", StringType, true) :: Nil) - - assert(expectedSchema === jsonDF.schema) - - jsonDF.registerTempTable("jsonTable") - - checkAnswer( - sql("select * from jsonTable"), - Row(new java.math.BigDecimal("92233720368547758070"), - true, - 1.7976931348623157E308, - 10, - 21474836470L, - null, - "this is a simple string.") - ) - } - - test("Complex field and type inferring") { - val jsonDF = ctx.read.json(complexFieldAndType1) - - val expectedSchema = StructType( - StructField("arrayOfArray1", ArrayType(ArrayType(StringType, true), true), true) :: - StructField("arrayOfArray2", ArrayType(ArrayType(DoubleType, true), true), true) :: - StructField("arrayOfBigInteger", ArrayType(DecimalType(21, 0), true), true) :: - StructField("arrayOfBoolean", ArrayType(BooleanType, true), true) :: - StructField("arrayOfDouble", ArrayType(DoubleType, true), true) :: - StructField("arrayOfInteger", ArrayType(LongType, true), true) :: - StructField("arrayOfLong", ArrayType(LongType, true), true) :: - StructField("arrayOfNull", ArrayType(StringType, true), true) :: - StructField("arrayOfString", ArrayType(StringType, true), true) :: - StructField("arrayOfStruct", ArrayType( - StructType( - StructField("field1", BooleanType, true) :: - StructField("field2", StringType, true) :: - StructField("field3", StringType, true) :: Nil), true), true) :: - StructField("struct", StructType( - StructField("field1", BooleanType, true) :: - StructField("field2", DecimalType(20, 0), true) :: Nil), true) :: - StructField("structWithArrayFields", StructType( - StructField("field1", ArrayType(LongType, true), true) :: - StructField("field2", ArrayType(StringType, true), true) :: Nil), true) :: Nil) - - assert(expectedSchema === jsonDF.schema) - - jsonDF.registerTempTable("jsonTable") - - // Access elements of a primitive array. - checkAnswer( - sql("select arrayOfString[0], arrayOfString[1], arrayOfString[2] from jsonTable"), - Row("str1", "str2", null) - ) - - // Access an array of null values. - checkAnswer( - sql("select arrayOfNull from jsonTable"), - Row(Seq(null, null, null, null)) - ) - - // Access elements of a BigInteger array (we use DecimalType internally). - checkAnswer( - sql("select arrayOfBigInteger[0], arrayOfBigInteger[1], arrayOfBigInteger[2] from jsonTable"), - Row(new java.math.BigDecimal("922337203685477580700"), - new java.math.BigDecimal("-922337203685477580800"), null) - ) - - // Access elements of an array of arrays. - checkAnswer( - sql("select arrayOfArray1[0], arrayOfArray1[1] from jsonTable"), - Row(Seq("1", "2", "3"), Seq("str1", "str2")) - ) - - // Access elements of an array of arrays. - checkAnswer( - sql("select arrayOfArray2[0], arrayOfArray2[1] from jsonTable"), - Row(Seq(1.0, 2.0, 3.0), Seq(1.1, 2.1, 3.1)) - ) - - // Access elements of an array inside a filed with the type of ArrayType(ArrayType). - checkAnswer( - sql("select arrayOfArray1[1][1], arrayOfArray2[1][1] from jsonTable"), - Row("str2", 2.1) - ) - - // Access elements of an array of structs. - checkAnswer( - sql("select arrayOfStruct[0], arrayOfStruct[1], arrayOfStruct[2], arrayOfStruct[3] " + - "from jsonTable"), - Row( - Row(true, "str1", null), - Row(false, null, null), - Row(null, null, null), - null) - ) - - // Access a struct and fields inside of it. - checkAnswer( - sql("select struct, struct.field1, struct.field2 from jsonTable"), - Row( - Row(true, new java.math.BigDecimal("92233720368547758070")), - true, - new java.math.BigDecimal("92233720368547758070")) :: Nil - ) - - // Access an array field of a struct. - checkAnswer( - sql("select structWithArrayFields.field1, structWithArrayFields.field2 from jsonTable"), - Row(Seq(4, 5, 6), Seq("str1", "str2")) - ) - - // Access elements of an array field of a struct. - checkAnswer( - sql("select structWithArrayFields.field1[1], structWithArrayFields.field2[3] from jsonTable"), - Row(5, null) - ) - } - - test("GetField operation on complex data type") { - val jsonDF = ctx.read.json(complexFieldAndType1) - jsonDF.registerTempTable("jsonTable") - - checkAnswer( - sql("select arrayOfStruct[0].field1, arrayOfStruct[0].field2 from jsonTable"), - Row(true, "str1") - ) - - // Getting all values of a specific field from an array of structs. - checkAnswer( - sql("select arrayOfStruct.field1, arrayOfStruct.field2 from jsonTable"), - Row(Seq(true, false, null), Seq("str1", null, null)) - ) - } - - test("Type conflict in primitive field values") { - val jsonDF = ctx.read.json(primitiveFieldValueTypeConflict) - - val expectedSchema = StructType( - StructField("num_bool", StringType, true) :: - StructField("num_num_1", LongType, true) :: - StructField("num_num_2", DoubleType, true) :: - StructField("num_num_3", DoubleType, true) :: - StructField("num_str", StringType, true) :: - StructField("str_bool", StringType, true) :: Nil) - - assert(expectedSchema === jsonDF.schema) - - jsonDF.registerTempTable("jsonTable") - - checkAnswer( - sql("select * from jsonTable"), - Row("true", 11L, null, 1.1, "13.1", "str1") :: - Row("12", null, 21474836470.9, null, null, "true") :: - Row("false", 21474836470L, 92233720368547758070d, 100, "str1", "false") :: - Row(null, 21474836570L, 1.1, 21474836470L, "92233720368547758070", null) :: Nil - ) - - // Number and Boolean conflict: resolve the type as number in this query. - checkAnswer( - sql("select num_bool - 10 from jsonTable where num_bool > 11"), - Row(2) - ) - - // Widening to LongType - checkAnswer( - sql("select num_num_1 - 100 from jsonTable where num_num_1 > 11"), - Row(21474836370L) :: Row(21474836470L) :: Nil - ) - - checkAnswer( - sql("select num_num_1 - 100 from jsonTable where num_num_1 > 10"), - Row(-89) :: Row(21474836370L) :: Row(21474836470L) :: Nil - ) - - // Widening to DecimalType - checkAnswer( - sql("select num_num_2 + 1.3 from jsonTable where num_num_2 > 1.1"), - Row(21474836472.2) :: - Row(92233720368547758071.3) :: Nil - ) - - // Widening to Double - checkAnswer( - sql("select num_num_3 + 1.2 from jsonTable where num_num_3 > 1.1"), - Row(101.2) :: Row(21474836471.2) :: Nil - ) - - // Number and String conflict: resolve the type as number in this query. - checkAnswer( - sql("select num_str + 1.2 from jsonTable where num_str > 14"), - Row(BigDecimal("92233720368547758071.2")) - ) - - // Number and String conflict: resolve the type as number in this query. - checkAnswer( - sql("select num_str + 1.2 from jsonTable where num_str >= 92233720368547758060"), - Row(new java.math.BigDecimal("92233720368547758071.2")) - ) - - // String and Boolean conflict: resolve the type as string. - checkAnswer( - sql("select * from jsonTable where str_bool = 'str1'"), - Row("true", 11L, null, 1.1, "13.1", "str1") - ) - } - - ignore("Type conflict in primitive field values (Ignored)") { - val jsonDF = ctx.read.json(primitiveFieldValueTypeConflict) - jsonDF.registerTempTable("jsonTable") - - // Right now, the analyzer does not promote strings in a boolean expression. - // Number and Boolean conflict: resolve the type as boolean in this query. - checkAnswer( - sql("select num_bool from jsonTable where NOT num_bool"), - Row(false) - ) - - checkAnswer( - sql("select str_bool from jsonTable where NOT str_bool"), - Row(false) - ) - - // Right now, the analyzer does not know that num_bool should be treated as a boolean. - // Number and Boolean conflict: resolve the type as boolean in this query. - checkAnswer( - sql("select num_bool from jsonTable where num_bool"), - Row(true) - ) - - checkAnswer( - sql("select str_bool from jsonTable where str_bool"), - Row(false) - ) - - // The plan of the following DSL is - // Project [(CAST(num_str#65:4, DoubleType) + 1.2) AS num#78] - // Filter (CAST(CAST(num_str#65:4, DoubleType), DecimalType) > 92233720368547758060) - // ExistingRdd [num_bool#61,num_num_1#62L,num_num_2#63,num_num_3#64,num_str#65,str_bool#66] - // We should directly cast num_str to DecimalType and also need to do the right type promotion - // in the Project. - checkAnswer( - jsonDF. - where('num_str >= BigDecimal("92233720368547758060")). - select(('num_str + 1.2).as("num")), - Row(new java.math.BigDecimal("92233720368547758071.2").doubleValue()) - ) - - // The following test will fail. The type of num_str is StringType. - // So, to evaluate num_str + 1.2, we first need to use Cast to convert the type. - // In our test data, one value of num_str is 13.1. - // The result of (CAST(num_str#65:4, DoubleType) + 1.2) for this value is 14.299999999999999, - // which is not 14.3. - // Number and String conflict: resolve the type as number in this query. - checkAnswer( - sql("select num_str + 1.2 from jsonTable where num_str > 13"), - Row(BigDecimal("14.3")) :: Row(BigDecimal("92233720368547758071.2")) :: Nil - ) - } - - test("Type conflict in complex field values") { - val jsonDF = ctx.read.json(complexFieldValueTypeConflict) - - val expectedSchema = StructType( - StructField("array", ArrayType(LongType, true), true) :: - StructField("num_struct", StringType, true) :: - StructField("str_array", StringType, true) :: - StructField("struct", StructType( - StructField("field", StringType, true) :: Nil), true) :: - StructField("struct_array", StringType, true) :: Nil) - - assert(expectedSchema === jsonDF.schema) - - jsonDF.registerTempTable("jsonTable") - - checkAnswer( - sql("select * from jsonTable"), - Row(Seq(), "11", "[1,2,3]", Row(null), "[]") :: - Row(null, """{"field":false}""", null, null, "{}") :: - Row(Seq(4, 5, 6), null, "str", Row(null), "[7,8,9]") :: - Row(Seq(7), "{}", """["str1","str2",33]""", Row("str"), """{"field":true}""") :: Nil - ) - } - - test("Type conflict in array elements") { - val jsonDF = ctx.read.json(arrayElementTypeConflict) - - val expectedSchema = StructType( - StructField("array1", ArrayType(StringType, true), true) :: - StructField("array2", ArrayType(StructType( - StructField("field", LongType, true) :: Nil), true), true) :: - StructField("array3", ArrayType(StringType, true), true) :: Nil) - - assert(expectedSchema === jsonDF.schema) - - jsonDF.registerTempTable("jsonTable") - - checkAnswer( - sql("select * from jsonTable"), - Row(Seq("1", "1.1", "true", null, "[]", "{}", "[2,3,4]", - """{"field":"str"}"""), Seq(Row(214748364700L), Row(1)), null) :: - Row(null, null, Seq("""{"field":"str"}""", """{"field":1}""")) :: - Row(null, null, Seq("1", "2", "3")) :: Nil - ) - - // Treat an element as a number. - checkAnswer( - sql("select array1[0] + 1 from jsonTable where array1 is not null"), - Row(2) - ) - } - - test("Handling missing fields") { - val jsonDF = ctx.read.json(missingFields) - - val expectedSchema = StructType( - StructField("a", BooleanType, true) :: - StructField("b", LongType, true) :: - StructField("c", ArrayType(LongType, true), true) :: - StructField("d", StructType( - StructField("field", BooleanType, true) :: Nil), true) :: - StructField("e", StringType, true) :: Nil) - - assert(expectedSchema === jsonDF.schema) - - jsonDF.registerTempTable("jsonTable") - } - - test("jsonFile should be based on JSONRelation") { - val dir = Utils.createTempDir() - dir.delete() - val path = dir.getCanonicalFile.toURI.toString - ctx.sparkContext.parallelize(1 to 100) - .map(i => s"""{"a": 1, "b": "str$i"}""").saveAsTextFile(path) - val jsonDF = ctx.read.option("samplingRatio", "0.49").json(path) - - val analyzed = jsonDF.queryExecution.analyzed - assert( - analyzed.isInstanceOf[LogicalRelation], - "The DataFrame returned by jsonFile should be based on LogicalRelation.") - val relation = analyzed.asInstanceOf[LogicalRelation].relation - assert( - relation.isInstanceOf[JSONRelation], - "The DataFrame returned by jsonFile should be based on JSONRelation.") - assert(relation.asInstanceOf[JSONRelation].paths === Array(path)) - assert(relation.asInstanceOf[JSONRelation].samplingRatio === (0.49 +- 0.001)) - - val schema = StructType(StructField("a", LongType, true) :: Nil) - val logicalRelation = - ctx.read.schema(schema).json(path).queryExecution.analyzed.asInstanceOf[LogicalRelation] - val relationWithSchema = logicalRelation.relation.asInstanceOf[JSONRelation] - assert(relationWithSchema.paths === Array(path)) - assert(relationWithSchema.schema === schema) - assert(relationWithSchema.samplingRatio > 0.99) - } - - test("Loading a JSON dataset from a text file") { - val dir = Utils.createTempDir() - dir.delete() - val path = dir.getCanonicalPath - primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path) - val jsonDF = ctx.read.json(path) - - val expectedSchema = StructType( - StructField("bigInteger", DecimalType(20, 0), true) :: - StructField("boolean", BooleanType, true) :: - StructField("double", DoubleType, true) :: - StructField("integer", LongType, true) :: - StructField("long", LongType, true) :: - StructField("null", StringType, true) :: - StructField("string", StringType, true) :: Nil) - - assert(expectedSchema === jsonDF.schema) - - jsonDF.registerTempTable("jsonTable") - - checkAnswer( - sql("select * from jsonTable"), - Row(new java.math.BigDecimal("92233720368547758070"), - true, - 1.7976931348623157E308, - 10, - 21474836470L, - null, - "this is a simple string.") - ) - } - - test("Loading a JSON dataset from a text file with SQL") { - val dir = Utils.createTempDir() - dir.delete() - val path = dir.getCanonicalPath - primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path) - - sql( - s""" - |CREATE TEMPORARY TABLE jsonTableSQL - |USING org.apache.spark.sql.json - |OPTIONS ( - | path '$path' - |) - """.stripMargin) - - checkAnswer( - sql("select * from jsonTableSQL"), - Row(new java.math.BigDecimal("92233720368547758070"), - true, - 1.7976931348623157E308, - 10, - 21474836470L, - null, - "this is a simple string.") - ) - } - - test("Applying schemas") { - val dir = Utils.createTempDir() - dir.delete() - val path = dir.getCanonicalPath - primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path) - - val schema = StructType( - StructField("bigInteger", DecimalType.SYSTEM_DEFAULT, true) :: - StructField("boolean", BooleanType, true) :: - StructField("double", DoubleType, true) :: - StructField("integer", IntegerType, true) :: - StructField("long", LongType, true) :: - StructField("null", StringType, true) :: - StructField("string", StringType, true) :: Nil) - - val jsonDF1 = ctx.read.schema(schema).json(path) - - assert(schema === jsonDF1.schema) - - jsonDF1.registerTempTable("jsonTable1") - - checkAnswer( - sql("select * from jsonTable1"), - Row(new java.math.BigDecimal("92233720368547758070"), - true, - 1.7976931348623157E308, - 10, - 21474836470L, - null, - "this is a simple string.") - ) - - val jsonDF2 = ctx.read.schema(schema).json(primitiveFieldAndType) - - assert(schema === jsonDF2.schema) - - jsonDF2.registerTempTable("jsonTable2") - - checkAnswer( - sql("select * from jsonTable2"), - Row(new java.math.BigDecimal("92233720368547758070"), - true, - 1.7976931348623157E308, - 10, - 21474836470L, - null, - "this is a simple string.") - ) - } - - test("Applying schemas with MapType") { - val schemaWithSimpleMap = StructType( - StructField("map", MapType(StringType, IntegerType, true), false) :: Nil) - val jsonWithSimpleMap = ctx.read.schema(schemaWithSimpleMap).json(mapType1) - - jsonWithSimpleMap.registerTempTable("jsonWithSimpleMap") - - checkAnswer( - sql("select map from jsonWithSimpleMap"), - Row(Map("a" -> 1)) :: - Row(Map("b" -> 2)) :: - Row(Map("c" -> 3)) :: - Row(Map("c" -> 1, "d" -> 4)) :: - Row(Map("e" -> null)) :: Nil - ) - - checkAnswer( - sql("select map['c'] from jsonWithSimpleMap"), - Row(null) :: - Row(null) :: - Row(3) :: - Row(1) :: - Row(null) :: Nil - ) - - val innerStruct = StructType( - StructField("field1", ArrayType(IntegerType, true), true) :: - StructField("field2", IntegerType, true) :: Nil) - val schemaWithComplexMap = StructType( - StructField("map", MapType(StringType, innerStruct, true), false) :: Nil) - - val jsonWithComplexMap = ctx.read.schema(schemaWithComplexMap).json(mapType2) - - jsonWithComplexMap.registerTempTable("jsonWithComplexMap") - - checkAnswer( - sql("select map from jsonWithComplexMap"), - Row(Map("a" -> Row(Seq(1, 2, 3, null), null))) :: - Row(Map("b" -> Row(null, 2))) :: - Row(Map("c" -> Row(Seq(), 4))) :: - Row(Map("c" -> Row(null, 3), "d" -> Row(Seq(null), null))) :: - Row(Map("e" -> null)) :: - Row(Map("f" -> Row(null, null))) :: Nil - ) - - checkAnswer( - sql("select map['a'].field1, map['c'].field2 from jsonWithComplexMap"), - Row(Seq(1, 2, 3, null), null) :: - Row(null, null) :: - Row(null, 4) :: - Row(null, 3) :: - Row(null, null) :: - Row(null, null) :: Nil - ) - } - - test("SPARK-2096 Correctly parse dot notations") { - val jsonDF = ctx.read.json(complexFieldAndType2) - jsonDF.registerTempTable("jsonTable") - - checkAnswer( - sql("select arrayOfStruct[0].field1, arrayOfStruct[0].field2 from jsonTable"), - Row(true, "str1") - ) - checkAnswer( - sql( - """ - |select complexArrayOfStruct[0].field1[1].inner2[0], complexArrayOfStruct[1].field2[0][1] - |from jsonTable - """.stripMargin), - Row("str2", 6) - ) - } - - test("SPARK-3390 Complex arrays") { - val jsonDF = ctx.read.json(complexFieldAndType2) - jsonDF.registerTempTable("jsonTable") - - checkAnswer( - sql( - """ - |select arrayOfArray1[0][0][0], arrayOfArray1[1][0][1], arrayOfArray1[1][1][0] - |from jsonTable - """.stripMargin), - Row(5, 7, 8) - ) - checkAnswer( - sql( - """ - |select arrayOfArray2[0][0][0].inner1, arrayOfArray2[1][0], - |arrayOfArray2[1][1][1].inner2[0], arrayOfArray2[2][0][0].inner3[0][0].inner4 - |from jsonTable - """.stripMargin), - Row("str1", Nil, "str4", 2) - ) - } - - test("SPARK-3308 Read top level JSON arrays") { - val jsonDF = ctx.read.json(jsonArray) - jsonDF.registerTempTable("jsonTable") - - checkAnswer( - sql( - """ - |select a, b, c - |from jsonTable - """.stripMargin), - Row("str_a_1", null, null) :: - Row("str_a_2", null, null) :: - Row(null, "str_b_3", null) :: - Row("str_a_4", "str_b_4", "str_c_4") :: Nil - ) - } - - test("Corrupt records") { - // Test if we can query corrupt records. - val oldColumnNameOfCorruptRecord = ctx.conf.columnNameOfCorruptRecord - ctx.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, "_unparsed") - - val jsonDF = ctx.read.json(corruptRecords) - jsonDF.registerTempTable("jsonTable") - - val schema = StructType( - StructField("_unparsed", StringType, true) :: - StructField("a", StringType, true) :: - StructField("b", StringType, true) :: - StructField("c", StringType, true) :: Nil) - - assert(schema === jsonDF.schema) - - // In HiveContext, backticks should be used to access columns starting with a underscore. - checkAnswer( - sql( - """ - |SELECT a, b, c, _unparsed - |FROM jsonTable - """.stripMargin), - Row(null, null, null, "{") :: - Row(null, null, null, "") :: - Row(null, null, null, """{"a":1, b:2}""") :: - Row(null, null, null, """{"a":{, b:3}""") :: - Row("str_a_4", "str_b_4", "str_c_4", null) :: - Row(null, null, null, "]") :: Nil - ) - - checkAnswer( - sql( - """ - |SELECT a, b, c - |FROM jsonTable - |WHERE _unparsed IS NULL - """.stripMargin), - Row("str_a_4", "str_b_4", "str_c_4") - ) - - checkAnswer( - sql( - """ - |SELECT _unparsed - |FROM jsonTable - |WHERE _unparsed IS NOT NULL - """.stripMargin), - Row("{") :: - Row("") :: - Row("""{"a":1, b:2}""") :: - Row("""{"a":{, b:3}""") :: - Row("]") :: Nil - ) - - ctx.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, oldColumnNameOfCorruptRecord) - } - - test("SPARK-4068: nulls in arrays") { - val jsonDF = ctx.read.json(nullsInArrays) - jsonDF.registerTempTable("jsonTable") - - val schema = StructType( - StructField("field1", - ArrayType(ArrayType(ArrayType(ArrayType(StringType, true), true), true), true), true) :: - StructField("field2", - ArrayType(ArrayType( - StructType(StructField("Test", LongType, true) :: Nil), true), true), true) :: - StructField("field3", - ArrayType(ArrayType( - StructType(StructField("Test", StringType, true) :: Nil), true), true), true) :: - StructField("field4", - ArrayType(ArrayType(ArrayType(LongType, true), true), true), true) :: Nil) - - assert(schema === jsonDF.schema) - - checkAnswer( - sql( - """ - |SELECT field1, field2, field3, field4 - |FROM jsonTable - """.stripMargin), - Row(Seq(Seq(null), Seq(Seq(Seq("Test")))), null, null, null) :: - Row(null, Seq(null, Seq(Row(1))), null, null) :: - Row(null, null, Seq(Seq(null), Seq(Row("2"))), null) :: - Row(null, null, null, Seq(Seq(null, Seq(1, 2, 3)))) :: Nil - ) - } - - test("SPARK-4228 DataFrame to JSON") { - val schema1 = StructType( - StructField("f1", IntegerType, false) :: - StructField("f2", StringType, false) :: - StructField("f3", BooleanType, false) :: - StructField("f4", ArrayType(StringType), nullable = true) :: - StructField("f5", IntegerType, true) :: Nil) - - val rowRDD1 = unparsedStrings.map { r => - val values = r.split(",").map(_.trim) - val v5 = try values(3).toInt catch { - case _: NumberFormatException => null - } - Row(values(0).toInt, values(1), values(2).toBoolean, r.split(",").toList, v5) - } - - val df1 = ctx.createDataFrame(rowRDD1, schema1) - df1.registerTempTable("applySchema1") - val df2 = df1.toDF - val result = df2.toJSON.collect() - // scalastyle:off - assert(result(0) === "{\"f1\":1,\"f2\":\"A1\",\"f3\":true,\"f4\":[\"1\",\" A1\",\" true\",\" null\"]}") - assert(result(3) === "{\"f1\":4,\"f2\":\"D4\",\"f3\":true,\"f4\":[\"4\",\" D4\",\" true\",\" 2147483644\"],\"f5\":2147483644}") - // scalastyle:on - - val schema2 = StructType( - StructField("f1", StructType( - StructField("f11", IntegerType, false) :: - StructField("f12", BooleanType, false) :: Nil), false) :: - StructField("f2", MapType(StringType, IntegerType, true), false) :: Nil) - - val rowRDD2 = unparsedStrings.map { r => - val values = r.split(",").map(_.trim) - val v4 = try values(3).toInt catch { - case _: NumberFormatException => null - } - Row(Row(values(0).toInt, values(2).toBoolean), Map(values(1) -> v4)) - } - - val df3 = ctx.createDataFrame(rowRDD2, schema2) - df3.registerTempTable("applySchema2") - val df4 = df3.toDF - val result2 = df4.toJSON.collect() - - assert(result2(1) === "{\"f1\":{\"f11\":2,\"f12\":false},\"f2\":{\"B2\":null}}") - assert(result2(3) === "{\"f1\":{\"f11\":4,\"f12\":true},\"f2\":{\"D4\":2147483644}}") - - val jsonDF = ctx.read.json(primitiveFieldAndType) - val primTable = ctx.read.json(jsonDF.toJSON) - primTable.registerTempTable("primativeTable") - checkAnswer( - sql("select * from primativeTable"), - Row(new java.math.BigDecimal("92233720368547758070"), - true, - 1.7976931348623157E308, - 10, - 21474836470L, - "this is a simple string.") - ) - - val complexJsonDF = ctx.read.json(complexFieldAndType1) - val compTable = ctx.read.json(complexJsonDF.toJSON) - compTable.registerTempTable("complexTable") - // Access elements of a primitive array. - checkAnswer( - sql("select arrayOfString[0], arrayOfString[1], arrayOfString[2] from complexTable"), - Row("str1", "str2", null) - ) - - // Access an array of null values. - checkAnswer( - sql("select arrayOfNull from complexTable"), - Row(Seq(null, null, null, null)) - ) - - // Access elements of a BigInteger array (we use DecimalType internally). - checkAnswer( - sql("select arrayOfBigInteger[0], arrayOfBigInteger[1], arrayOfBigInteger[2] " + - " from complexTable"), - Row(new java.math.BigDecimal("922337203685477580700"), - new java.math.BigDecimal("-922337203685477580800"), null) - ) - - // Access elements of an array of arrays. - checkAnswer( - sql("select arrayOfArray1[0], arrayOfArray1[1] from complexTable"), - Row(Seq("1", "2", "3"), Seq("str1", "str2")) - ) - - // Access elements of an array of arrays. - checkAnswer( - sql("select arrayOfArray2[0], arrayOfArray2[1] from complexTable"), - Row(Seq(1.0, 2.0, 3.0), Seq(1.1, 2.1, 3.1)) - ) - - // Access elements of an array inside a filed with the type of ArrayType(ArrayType). - checkAnswer( - sql("select arrayOfArray1[1][1], arrayOfArray2[1][1] from complexTable"), - Row("str2", 2.1) - ) - - // Access a struct and fields inside of it. - checkAnswer( - sql("select struct, struct.field1, struct.field2 from complexTable"), - Row( - Row(true, new java.math.BigDecimal("92233720368547758070")), - true, - new java.math.BigDecimal("92233720368547758070")) :: Nil - ) - - // Access an array field of a struct. - checkAnswer( - sql("select structWithArrayFields.field1, structWithArrayFields.field2 from complexTable"), - Row(Seq(4, 5, 6), Seq("str1", "str2")) - ) - - // Access elements of an array field of a struct. - checkAnswer( - sql("select structWithArrayFields.field1[1], structWithArrayFields.field2[3] " + - "from complexTable"), - Row(5, null) - ) - } - - test("JSONRelation equality test") { - val context = org.apache.spark.sql.test.TestSQLContext - - val relation0 = new JSONRelation( - Some(empty), - 1.0, - Some(StructType(StructField("a", IntegerType, true) :: Nil)), - None, None)(context) - val logicalRelation0 = LogicalRelation(relation0) - val relation1 = new JSONRelation( - Some(singleRow), - 1.0, - Some(StructType(StructField("a", IntegerType, true) :: Nil)), - None, None)(context) - val logicalRelation1 = LogicalRelation(relation1) - val relation2 = new JSONRelation( - Some(singleRow), - 0.5, - Some(StructType(StructField("a", IntegerType, true) :: Nil)), - None, None)(context) - val logicalRelation2 = LogicalRelation(relation2) - val relation3 = new JSONRelation( - Some(singleRow), - 1.0, - Some(StructType(StructField("b", IntegerType, true) :: Nil)), - None, None)(context) - val logicalRelation3 = LogicalRelation(relation3) - - assert(relation0 !== relation1) - assert(!logicalRelation0.sameResult(logicalRelation1), - s"$logicalRelation0 and $logicalRelation1 should be considered not having the same result.") - - assert(relation1 === relation2) - assert(logicalRelation1.sameResult(logicalRelation2), - s"$logicalRelation1 and $logicalRelation2 should be considered having the same result.") - - assert(relation1 !== relation3) - assert(!logicalRelation1.sameResult(logicalRelation3), - s"$logicalRelation1 and $logicalRelation3 should be considered not having the same result.") - - assert(relation2 !== relation3) - assert(!logicalRelation2.sameResult(logicalRelation3), - s"$logicalRelation2 and $logicalRelation3 should be considered not having the same result.") - - withTempPath(dir => { - val path = dir.getCanonicalFile.toURI.toString - ctx.sparkContext.parallelize(1 to 100) - .map(i => s"""{"a": 1, "b": "str$i"}""").saveAsTextFile(path) - - val d1 = ResolvedDataSource( - context, - userSpecifiedSchema = None, - partitionColumns = Array.empty[String], - provider = classOf[DefaultSource].getCanonicalName, - options = Map("path" -> path)) - - val d2 = ResolvedDataSource( - context, - userSpecifiedSchema = None, - partitionColumns = Array.empty[String], - provider = classOf[DefaultSource].getCanonicalName, - options = Map("path" -> path)) - assert(d1 === d2) - }) - } - - test("SPARK-6245 JsonRDD.inferSchema on empty RDD") { - // This is really a test that it doesn't throw an exception - val emptySchema = InferSchema(empty, 1.0, "") - assert(StructType(Seq()) === emptySchema) - } - - test("SPARK-7565 MapType in JsonRDD") { - val oldColumnNameOfCorruptRecord = ctx.conf.columnNameOfCorruptRecord - ctx.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, "_unparsed") - - val schemaWithSimpleMap = StructType( - StructField("map", MapType(StringType, IntegerType, true), false) :: Nil) - try { - val temp = Utils.createTempDir().getPath - - val df = ctx.read.schema(schemaWithSimpleMap).json(mapType1) - df.write.mode("overwrite").parquet(temp) - // order of MapType is not defined - assert(ctx.read.parquet(temp).count() == 5) - - val df2 = ctx.read.json(corruptRecords) - df2.write.mode("overwrite").parquet(temp) - checkAnswer(ctx.read.parquet(temp), df2.collect()) - } finally { - ctx.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, oldColumnNameOfCorruptRecord) - } - } - - test("SPARK-8093 Erase empty structs") { - val emptySchema = InferSchema(emptyRecords, 1.0, "") - assert(StructType(Seq()) === emptySchema) - } - - test("JSON with Partition") { - def makePartition(rdd: RDD[String], parent: File, partName: String, partValue: Any): File = { - val p = new File(parent, s"$partName=${partValue.toString}") - rdd.saveAsTextFile(p.getCanonicalPath) - p - } - - withTempPath(root => { - val d1 = new File(root, "d1=1") - // root/dt=1/col1=abc - val p1_col1 = makePartition( - ctx.sparkContext.parallelize(2 to 5).map(i => s"""{"a": 1, "b": "str$i"}"""), - d1, - "col1", - "abc") - - // root/dt=1/col1=abd - val p2 = makePartition( - ctx.sparkContext.parallelize(6 to 10).map(i => s"""{"a": 1, "b": "str$i"}"""), - d1, - "col1", - "abd") - - ctx.read.json(root.getAbsolutePath).registerTempTable("test_myjson_with_part") - checkAnswer( - sql("SELECT count(a) FROM test_myjson_with_part where d1 = 1 and col1='abc'"), Row(4)) - checkAnswer( - sql("SELECT count(a) FROM test_myjson_with_part where d1 = 1 and col1='abd'"), Row(5)) - checkAnswer(sql("SELECT count(a) FROM test_myjson_with_part where d1 = 1"), Row(9)) - }) - } -}
http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala deleted file mode 100644 index 369df56..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.json - -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.SQLContext - -trait TestJsonData { - - protected def ctx: SQLContext - - def primitiveFieldAndType: RDD[String] = - ctx.sparkContext.parallelize( - """{"string":"this is a simple string.", - "integer":10, - "long":21474836470, - "bigInteger":92233720368547758070, - "double":1.7976931348623157E308, - "boolean":true, - "null":null - }""" :: Nil) - - def primitiveFieldValueTypeConflict: RDD[String] = - ctx.sparkContext.parallelize( - """{"num_num_1":11, "num_num_2":null, "num_num_3": 1.1, - "num_bool":true, "num_str":13.1, "str_bool":"str1"}""" :: - """{"num_num_1":null, "num_num_2":21474836470.9, "num_num_3": null, - "num_bool":12, "num_str":null, "str_bool":true}""" :: - """{"num_num_1":21474836470, "num_num_2":92233720368547758070, "num_num_3": 100, - "num_bool":false, "num_str":"str1", "str_bool":false}""" :: - """{"num_num_1":21474836570, "num_num_2":1.1, "num_num_3": 21474836470, - "num_bool":null, "num_str":92233720368547758070, "str_bool":null}""" :: Nil) - - def jsonNullStruct: RDD[String] = - ctx.sparkContext.parallelize( - """{"nullstr":"","ip":"27.31.100.29","headers":{"Host":"1.abc.com","Charset":"UTF-8"}}""" :: - """{"nullstr":"","ip":"27.31.100.29","headers":{}}""" :: - """{"nullstr":"","ip":"27.31.100.29","headers":""}""" :: - """{"nullstr":null,"ip":"27.31.100.29","headers":null}""" :: Nil) - - def complexFieldValueTypeConflict: RDD[String] = - ctx.sparkContext.parallelize( - """{"num_struct":11, "str_array":[1, 2, 3], - "array":[], "struct_array":[], "struct": {}}""" :: - """{"num_struct":{"field":false}, "str_array":null, - "array":null, "struct_array":{}, "struct": null}""" :: - """{"num_struct":null, "str_array":"str", - "array":[4, 5, 6], "struct_array":[7, 8, 9], "struct": {"field":null}}""" :: - """{"num_struct":{}, "str_array":["str1", "str2", 33], - "array":[7], "struct_array":{"field": true}, "struct": {"field": "str"}}""" :: Nil) - - def arrayElementTypeConflict: RDD[String] = - ctx.sparkContext.parallelize( - """{"array1": [1, 1.1, true, null, [], {}, [2,3,4], {"field":"str"}], - "array2": [{"field":214748364700}, {"field":1}]}""" :: - """{"array3": [{"field":"str"}, {"field":1}]}""" :: - """{"array3": [1, 2, 3]}""" :: Nil) - - def missingFields: RDD[String] = - ctx.sparkContext.parallelize( - """{"a":true}""" :: - """{"b":21474836470}""" :: - """{"c":[33, 44]}""" :: - """{"d":{"field":true}}""" :: - """{"e":"str"}""" :: Nil) - - def complexFieldAndType1: RDD[String] = - ctx.sparkContext.parallelize( - """{"struct":{"field1": true, "field2": 92233720368547758070}, - "structWithArrayFields":{"field1":[4, 5, 6], "field2":["str1", "str2"]}, - "arrayOfString":["str1", "str2"], - "arrayOfInteger":[1, 2147483647, -2147483648], - "arrayOfLong":[21474836470, 9223372036854775807, -9223372036854775808], - "arrayOfBigInteger":[922337203685477580700, -922337203685477580800], - "arrayOfDouble":[1.2, 1.7976931348623157E308, 4.9E-324, 2.2250738585072014E-308], - "arrayOfBoolean":[true, false, true], - "arrayOfNull":[null, null, null, null], - "arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}, {"field3": null}], - "arrayOfArray1":[[1, 2, 3], ["str1", "str2"]], - "arrayOfArray2":[[1, 2, 3], [1.1, 2.1, 3.1]] - }""" :: Nil) - - def complexFieldAndType2: RDD[String] = - ctx.sparkContext.parallelize( - """{"arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}, {"field3": null}], - "complexArrayOfStruct": [ - { - "field1": [ - { - "inner1": "str1" - }, - { - "inner2": ["str2", "str22"] - }], - "field2": [[1, 2], [3, 4]] - }, - { - "field1": [ - { - "inner2": ["str3", "str33"] - }, - { - "inner1": "str4" - }], - "field2": [[5, 6], [7, 8]] - }], - "arrayOfArray1": [ - [ - [5] - ], - [ - [6, 7], - [8] - ]], - "arrayOfArray2": [ - [ - [ - { - "inner1": "str1" - } - ] - ], - [ - [], - [ - {"inner2": ["str3", "str33"]}, - {"inner2": ["str4"], "inner1": "str11"} - ] - ], - [ - [ - {"inner3": [[{"inner4": 2}]]} - ] - ]] - }""" :: Nil) - - def mapType1: RDD[String] = - ctx.sparkContext.parallelize( - """{"map": {"a": 1}}""" :: - """{"map": {"b": 2}}""" :: - """{"map": {"c": 3}}""" :: - """{"map": {"c": 1, "d": 4}}""" :: - """{"map": {"e": null}}""" :: Nil) - - def mapType2: RDD[String] = - ctx.sparkContext.parallelize( - """{"map": {"a": {"field1": [1, 2, 3, null]}}}""" :: - """{"map": {"b": {"field2": 2}}}""" :: - """{"map": {"c": {"field1": [], "field2": 4}}}""" :: - """{"map": {"c": {"field2": 3}, "d": {"field1": [null]}}}""" :: - """{"map": {"e": null}}""" :: - """{"map": {"f": {"field1": null}}}""" :: Nil) - - def nullsInArrays: RDD[String] = - ctx.sparkContext.parallelize( - """{"field1":[[null], [[["Test"]]]]}""" :: - """{"field2":[null, [{"Test":1}]]}""" :: - """{"field3":[[null], [{"Test":"2"}]]}""" :: - """{"field4":[[null, [1,2,3]]]}""" :: Nil) - - def jsonArray: RDD[String] = - ctx.sparkContext.parallelize( - """[{"a":"str_a_1"}]""" :: - """[{"a":"str_a_2"}, {"b":"str_b_3"}]""" :: - """{"b":"str_b_4", "a":"str_a_4", "c":"str_c_4"}""" :: - """[]""" :: Nil) - - def corruptRecords: RDD[String] = - ctx.sparkContext.parallelize( - """{""" :: - """""" :: - """{"a":1, b:2}""" :: - """{"a":{, b:3}""" :: - """{"b":"str_b_4", "a":"str_a_4", "c":"str_c_4"}""" :: - """]""" :: Nil) - - def emptyRecords: RDD[String] = - ctx.sparkContext.parallelize( - """{""" :: - """""" :: - """{"a": {}}""" :: - """{"a": {"b": {}}}""" :: - """{"b": [{"c": {}}]}""" :: - """]""" :: Nil) - - lazy val singleRow: RDD[String] = - ctx.sparkContext.parallelize( - """{"a":123}""" :: Nil) - - def empty: RDD[String] = ctx.sparkContext.parallelize(Seq[String]()) -} http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/test/scala/org/apache/spark/sql/metric/SQLMetricsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/metric/SQLMetricsSuite.scala deleted file mode 100644 index d22160f..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/metric/SQLMetricsSuite.scala +++ /dev/null @@ -1,145 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.spark.sql.metric - -import java.io.{ByteArrayInputStream, ByteArrayOutputStream} - -import scala.collection.mutable - -import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm._ -import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._ - -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.test.TestSQLContext -import org.apache.spark.util.Utils - - -class SQLMetricsSuite extends SparkFunSuite { - - test("LongSQLMetric should not box Long") { - val l = SQLMetrics.createLongMetric(TestSQLContext.sparkContext, "long") - val f = () => { l += 1L } - BoxingFinder.getClassReader(f.getClass).foreach { cl => - val boxingFinder = new BoxingFinder() - cl.accept(boxingFinder, 0) - assert(boxingFinder.boxingInvokes.isEmpty, s"Found boxing: ${boxingFinder.boxingInvokes}") - } - } - - test("IntSQLMetric should not box Int") { - val l = SQLMetrics.createIntMetric(TestSQLContext.sparkContext, "Int") - val f = () => { l += 1 } - BoxingFinder.getClassReader(f.getClass).foreach { cl => - val boxingFinder = new BoxingFinder() - cl.accept(boxingFinder, 0) - assert(boxingFinder.boxingInvokes.isEmpty, s"Found boxing: ${boxingFinder.boxingInvokes}") - } - } - - test("Normal accumulator should do boxing") { - // We need this test to make sure BoxingFinder works. - val l = TestSQLContext.sparkContext.accumulator(0L) - val f = () => { l += 1L } - BoxingFinder.getClassReader(f.getClass).foreach { cl => - val boxingFinder = new BoxingFinder() - cl.accept(boxingFinder, 0) - assert(boxingFinder.boxingInvokes.nonEmpty, "Found find boxing in this test") - } - } -} - -private case class MethodIdentifier[T](cls: Class[T], name: String, desc: String) - -/** - * If `method` is null, search all methods of this class recursively to find if they do some boxing. - * If `method` is specified, only search this method of the class to speed up the searching. - * - * This method will skip the methods in `visitedMethods` to avoid potential infinite cycles. - */ -private class BoxingFinder( - method: MethodIdentifier[_] = null, - val boxingInvokes: mutable.Set[String] = mutable.Set.empty, - visitedMethods: mutable.Set[MethodIdentifier[_]] = mutable.Set.empty) - extends ClassVisitor(ASM4) { - - private val primitiveBoxingClassName = - Set("java/lang/Long", - "java/lang/Double", - "java/lang/Integer", - "java/lang/Float", - "java/lang/Short", - "java/lang/Character", - "java/lang/Byte", - "java/lang/Boolean") - - override def visitMethod( - access: Int, name: String, desc: String, sig: String, exceptions: Array[String]): - MethodVisitor = { - if (method != null && (method.name != name || method.desc != desc)) { - // If method is specified, skip other methods. - return new MethodVisitor(ASM4) {} - } - - new MethodVisitor(ASM4) { - override def visitMethodInsn(op: Int, owner: String, name: String, desc: String) { - if (op == INVOKESPECIAL && name == "<init>" || op == INVOKESTATIC && name == "valueOf") { - if (primitiveBoxingClassName.contains(owner)) { - // Find boxing methods, e.g, new java.lang.Long(l) or java.lang.Long.valueOf(l) - boxingInvokes.add(s"$owner.$name") - } - } else { - // scalastyle:off classforname - val classOfMethodOwner = Class.forName(owner.replace('/', '.'), false, - Thread.currentThread.getContextClassLoader) - // scalastyle:on classforname - val m = MethodIdentifier(classOfMethodOwner, name, desc) - if (!visitedMethods.contains(m)) { - // Keep track of visited methods to avoid potential infinite cycles - visitedMethods += m - BoxingFinder.getClassReader(classOfMethodOwner).foreach { cl => - visitedMethods += m - cl.accept(new BoxingFinder(m, boxingInvokes, visitedMethods), 0) - } - } - } - } - } - } -} - -private object BoxingFinder { - - def getClassReader(cls: Class[_]): Option[ClassReader] = { - val className = cls.getName.replaceFirst("^.*\\.", "") + ".class" - val resourceStream = cls.getResourceAsStream(className) - val baos = new ByteArrayOutputStream(128) - // Copy data over, before delegating to ClassReader - - // else we can run out of open file handles. - Utils.copyStream(resourceStream, baos, true) - // ASM4 doesn't support Java 8 classes, which requires ASM5. - // So if the class is ASM5 (E.g., java.lang.Long when using JDK8 runtime to run these codes), - // then ClassReader will throw IllegalArgumentException, - // However, since this is only for testing, it's safe to skip these classes. - try { - Some(new ClassReader(new ByteArrayInputStream(baos.toByteArray))) - } catch { - case _: IllegalArgumentException => None - } - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetAvroCompatibilitySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetAvroCompatibilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetAvroCompatibilitySuite.scala deleted file mode 100644 index bfa4273..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetAvroCompatibilitySuite.scala +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.parquet - -import java.nio.ByteBuffer -import java.util.{List => JList, Map => JMap} - -import scala.collection.JavaConversions._ - -import org.apache.hadoop.fs.Path -import org.apache.parquet.avro.AvroParquetWriter - -import org.apache.spark.sql.parquet.test.avro.{Nested, ParquetAvroCompat} -import org.apache.spark.sql.test.TestSQLContext -import org.apache.spark.sql.{Row, SQLContext} - -class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest { - import ParquetCompatibilityTest._ - - override val sqlContext: SQLContext = TestSQLContext - - override protected def beforeAll(): Unit = { - super.beforeAll() - - val writer = - new AvroParquetWriter[ParquetAvroCompat]( - new Path(parquetStore.getCanonicalPath), - ParquetAvroCompat.getClassSchema) - - (0 until 10).foreach(i => writer.write(makeParquetAvroCompat(i))) - writer.close() - } - - test("Read Parquet file generated by parquet-avro") { - logInfo( - s"""Schema of the Parquet file written by parquet-avro: - |${readParquetSchema(parquetStore.getCanonicalPath)} - """.stripMargin) - - checkAnswer(sqlContext.read.parquet(parquetStore.getCanonicalPath), (0 until 10).map { i => - def nullable[T <: AnyRef]: ( => T) => T = makeNullable[T](i) - - Row( - i % 2 == 0, - i, - i.toLong * 10, - i.toFloat + 0.1f, - i.toDouble + 0.2d, - s"val_$i".getBytes, - s"val_$i", - - nullable(i % 2 == 0: java.lang.Boolean), - nullable(i: Integer), - nullable(i.toLong: java.lang.Long), - nullable(i.toFloat + 0.1f: java.lang.Float), - nullable(i.toDouble + 0.2d: java.lang.Double), - nullable(s"val_$i".getBytes), - nullable(s"val_$i"), - - Seq.tabulate(3)(n => s"arr_${i + n}"), - Seq.tabulate(3)(n => n.toString -> (i + n: Integer)).toMap, - Seq.tabulate(3) { n => - (i + n).toString -> Seq.tabulate(3) { m => - Row(Seq.tabulate(3)(j => i + j + m), s"val_${i + m}") - } - }.toMap) - }) - } - - def makeParquetAvroCompat(i: Int): ParquetAvroCompat = { - def nullable[T <: AnyRef] = makeNullable[T](i) _ - - def makeComplexColumn(i: Int): JMap[String, JList[Nested]] = { - mapAsJavaMap(Seq.tabulate(3) { n => - (i + n).toString -> seqAsJavaList(Seq.tabulate(3) { m => - Nested - .newBuilder() - .setNestedIntsColumn(seqAsJavaList(Seq.tabulate(3)(j => i + j + m))) - .setNestedStringColumn(s"val_${i + m}") - .build() - }) - }.toMap) - } - - ParquetAvroCompat - .newBuilder() - .setBoolColumn(i % 2 == 0) - .setIntColumn(i) - .setLongColumn(i.toLong * 10) - .setFloatColumn(i.toFloat + 0.1f) - .setDoubleColumn(i.toDouble + 0.2d) - .setBinaryColumn(ByteBuffer.wrap(s"val_$i".getBytes)) - .setStringColumn(s"val_$i") - - .setMaybeBoolColumn(nullable(i % 2 == 0: java.lang.Boolean)) - .setMaybeIntColumn(nullable(i: Integer)) - .setMaybeLongColumn(nullable(i.toLong: java.lang.Long)) - .setMaybeFloatColumn(nullable(i.toFloat + 0.1f: java.lang.Float)) - .setMaybeDoubleColumn(nullable(i.toDouble + 0.2d: java.lang.Double)) - .setMaybeBinaryColumn(nullable(ByteBuffer.wrap(s"val_$i".getBytes))) - .setMaybeStringColumn(nullable(s"val_$i")) - - .setStringsColumn(Seq.tabulate(3)(n => s"arr_${i + n}")) - .setStringToIntColumn( - mapAsJavaMap(Seq.tabulate(3)(n => n.toString -> (i + n: Integer)).toMap)) - .setComplexColumn(makeComplexColumn(i)) - - .build() - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetCompatibilityTest.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetCompatibilityTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetCompatibilityTest.scala deleted file mode 100644 index 5747893..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetCompatibilityTest.scala +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.parquet -import java.io.File - -import scala.collection.JavaConversions._ - -import org.apache.hadoop.fs.Path -import org.apache.parquet.hadoop.ParquetFileReader -import org.apache.parquet.schema.MessageType -import org.scalatest.BeforeAndAfterAll - -import org.apache.spark.sql.QueryTest -import org.apache.spark.util.Utils - -abstract class ParquetCompatibilityTest extends QueryTest with ParquetTest with BeforeAndAfterAll { - protected var parquetStore: File = _ - - /** - * Optional path to a staging subdirectory which may be created during query processing - * (Hive does this). - * Parquet files under this directory will be ignored in [[readParquetSchema()]] - * @return an optional staging directory to ignore when scanning for parquet files. - */ - protected def stagingDir: Option[String] = None - - override protected def beforeAll(): Unit = { - parquetStore = Utils.createTempDir(namePrefix = "parquet-compat_") - parquetStore.delete() - } - - override protected def afterAll(): Unit = { - Utils.deleteRecursively(parquetStore) - } - - def readParquetSchema(path: String): MessageType = { - val fsPath = new Path(path) - val fs = fsPath.getFileSystem(configuration) - val parquetFiles = fs.listStatus(fsPath).toSeq.filterNot { status => - status.getPath.getName.startsWith("_") || - stagingDir.map(status.getPath.getName.startsWith).getOrElse(false) - } - val footers = ParquetFileReader.readAllFootersInParallel(configuration, parquetFiles, true) - footers.head.getParquetMetadata.getFileMetaData.getSchema - } -} - -object ParquetCompatibilityTest { - def makeNullable[T <: AnyRef](i: Int)(f: => T): T = { - if (i % 3 == 0) null.asInstanceOf[T] else f - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala deleted file mode 100644 index b6a7c4f..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala +++ /dev/null @@ -1,319 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.parquet - -import org.apache.parquet.filter2.predicate.Operators._ -import org.apache.parquet.filter2.predicate.{FilterPredicate, Operators} - -import org.apache.spark.sql.catalyst.dsl.expressions._ -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.sql.types._ -import org.apache.spark.sql.{Column, DataFrame, QueryTest, Row, SQLConf} - -/** - * A test suite that tests Parquet filter2 API based filter pushdown optimization. - * - * NOTE: - * - * 1. `!(a cmp b)` is always transformed to its negated form `a cmp' b` by the - * `BooleanSimplification` optimization rule whenever possible. As a result, predicate `!(a < 1)` - * results in a `GtEq` filter predicate rather than a `Not`. - * - * 2. `Tuple1(Option(x))` is used together with `AnyVal` types like `Int` to ensure the inferred - * data type is nullable. - */ -class ParquetFilterSuite extends QueryTest with ParquetTest { - lazy val sqlContext = org.apache.spark.sql.test.TestSQLContext - - private def checkFilterPredicate( - df: DataFrame, - predicate: Predicate, - filterClass: Class[_ <: FilterPredicate], - checker: (DataFrame, Seq[Row]) => Unit, - expected: Seq[Row]): Unit = { - val output = predicate.collect { case a: Attribute => a }.distinct - - withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") { - val query = df - .select(output.map(e => Column(e)): _*) - .where(Column(predicate)) - - val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect { - case PhysicalOperation(_, filters, LogicalRelation(_: ParquetRelation)) => filters - }.flatten.reduceOption(_ && _) - - assert(maybeAnalyzedPredicate.isDefined) - maybeAnalyzedPredicate.foreach { pred => - val maybeFilter = ParquetFilters.createFilter(pred) - assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred") - maybeFilter.foreach { f => - // Doesn't bother checking type parameters here (e.g. `Eq[Integer]`) - assert(f.getClass === filterClass) - } - } - - checker(query, expected) - } - } - - private def checkFilterPredicate - (predicate: Predicate, filterClass: Class[_ <: FilterPredicate], expected: Seq[Row]) - (implicit df: DataFrame): Unit = { - checkFilterPredicate(df, predicate, filterClass, checkAnswer(_, _: Seq[Row]), expected) - } - - private def checkFilterPredicate[T] - (predicate: Predicate, filterClass: Class[_ <: FilterPredicate], expected: T) - (implicit df: DataFrame): Unit = { - checkFilterPredicate(predicate, filterClass, Seq(Row(expected)))(df) - } - - private def checkBinaryFilterPredicate - (predicate: Predicate, filterClass: Class[_ <: FilterPredicate], expected: Seq[Row]) - (implicit df: DataFrame): Unit = { - def checkBinaryAnswer(df: DataFrame, expected: Seq[Row]) = { - assertResult(expected.map(_.getAs[Array[Byte]](0).mkString(",")).sorted) { - df.map(_.getAs[Array[Byte]](0).mkString(",")).collect().toSeq.sorted - } - } - - checkFilterPredicate(df, predicate, filterClass, checkBinaryAnswer _, expected) - } - - private def checkBinaryFilterPredicate - (predicate: Predicate, filterClass: Class[_ <: FilterPredicate], expected: Array[Byte]) - (implicit df: DataFrame): Unit = { - checkBinaryFilterPredicate(predicate, filterClass, Seq(Row(expected)))(df) - } - - test("filter pushdown - boolean") { - withParquetDataFrame((true :: false :: Nil).map(b => Tuple1.apply(Option(b)))) { implicit df => - checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row]) - checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], Seq(Row(true), Row(false))) - - checkFilterPredicate('_1 === true, classOf[Eq[_]], true) - checkFilterPredicate('_1 !== true, classOf[NotEq[_]], false) - } - } - - test("filter pushdown - short") { - withParquetDataFrame((1 to 4).map(i => Tuple1(Option(i.toShort)))) { implicit df => - checkFilterPredicate(Cast('_1, IntegerType) === 1, classOf[Eq[_]], 1) - checkFilterPredicate( - Cast('_1, IntegerType) !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_))) - - checkFilterPredicate(Cast('_1, IntegerType) < 2, classOf[Lt[_]], 1) - checkFilterPredicate(Cast('_1, IntegerType) > 3, classOf[Gt[_]], 4) - checkFilterPredicate(Cast('_1, IntegerType) <= 1, classOf[LtEq[_]], 1) - checkFilterPredicate(Cast('_1, IntegerType) >= 4, classOf[GtEq[_]], 4) - - checkFilterPredicate(Literal(1) === Cast('_1, IntegerType), classOf[Eq[_]], 1) - checkFilterPredicate(Literal(2) > Cast('_1, IntegerType), classOf[Lt[_]], 1) - checkFilterPredicate(Literal(3) < Cast('_1, IntegerType), classOf[Gt[_]], 4) - checkFilterPredicate(Literal(1) >= Cast('_1, IntegerType), classOf[LtEq[_]], 1) - checkFilterPredicate(Literal(4) <= Cast('_1, IntegerType), classOf[GtEq[_]], 4) - - checkFilterPredicate(!(Cast('_1, IntegerType) < 4), classOf[GtEq[_]], 4) - checkFilterPredicate( - Cast('_1, IntegerType) > 2 && Cast('_1, IntegerType) < 4, classOf[Operators.And], 3) - checkFilterPredicate( - Cast('_1, IntegerType) < 2 || Cast('_1, IntegerType) > 3, - classOf[Operators.Or], - Seq(Row(1), Row(4))) - } - } - - test("filter pushdown - integer") { - withParquetDataFrame((1 to 4).map(i => Tuple1(Option(i)))) { implicit df => - checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row]) - checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_))) - - checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1) - checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_))) - - checkFilterPredicate('_1 < 2, classOf[Lt[_]], 1) - checkFilterPredicate('_1 > 3, classOf[Gt[_]], 4) - checkFilterPredicate('_1 <= 1, classOf[LtEq[_]], 1) - checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4) - - checkFilterPredicate(Literal(1) === '_1, classOf[Eq[_]], 1) - checkFilterPredicate(Literal(2) > '_1, classOf[Lt[_]], 1) - checkFilterPredicate(Literal(3) < '_1, classOf[Gt[_]], 4) - checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1) - checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4) - - checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4) - checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3) - checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4))) - } - } - - test("filter pushdown - long") { - withParquetDataFrame((1 to 4).map(i => Tuple1(Option(i.toLong)))) { implicit df => - checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row]) - checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_))) - - checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1) - checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_))) - - checkFilterPredicate('_1 < 2, classOf[Lt[_]], 1) - checkFilterPredicate('_1 > 3, classOf[Gt[_]], 4) - checkFilterPredicate('_1 <= 1, classOf[LtEq[_]], 1) - checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4) - - checkFilterPredicate(Literal(1) === '_1, classOf[Eq[_]], 1) - checkFilterPredicate(Literal(2) > '_1, classOf[Lt[_]], 1) - checkFilterPredicate(Literal(3) < '_1, classOf[Gt[_]], 4) - checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1) - checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4) - - checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4) - checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3) - checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4))) - } - } - - test("filter pushdown - float") { - withParquetDataFrame((1 to 4).map(i => Tuple1(Option(i.toFloat)))) { implicit df => - checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row]) - checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_))) - - checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1) - checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_))) - - checkFilterPredicate('_1 < 2, classOf[Lt[_]], 1) - checkFilterPredicate('_1 > 3, classOf[Gt[_]], 4) - checkFilterPredicate('_1 <= 1, classOf[LtEq[_]], 1) - checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4) - - checkFilterPredicate(Literal(1) === '_1, classOf[Eq[_]], 1) - checkFilterPredicate(Literal(2) > '_1, classOf[Lt[_]], 1) - checkFilterPredicate(Literal(3) < '_1, classOf[Gt[_]], 4) - checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1) - checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4) - - checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4) - checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3) - checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4))) - } - } - - test("filter pushdown - double") { - withParquetDataFrame((1 to 4).map(i => Tuple1(Option(i.toDouble)))) { implicit df => - checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row]) - checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_))) - - checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1) - checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_))) - - checkFilterPredicate('_1 < 2, classOf[Lt[_]], 1) - checkFilterPredicate('_1 > 3, classOf[Gt[_]], 4) - checkFilterPredicate('_1 <= 1, classOf[LtEq[_]], 1) - checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4) - - checkFilterPredicate(Literal(1) === '_1, classOf[Eq[_]], 1) - checkFilterPredicate(Literal(2) > '_1, classOf[Lt[_]], 1) - checkFilterPredicate(Literal(3) < '_1, classOf[Gt[_]], 4) - checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1) - checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4) - - checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4) - checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3) - checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4))) - } - } - - test("filter pushdown - string") { - withParquetDataFrame((1 to 4).map(i => Tuple1(i.toString))) { implicit df => - checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row]) - checkFilterPredicate( - '_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(i => Row.apply(i.toString))) - - checkFilterPredicate('_1 === "1", classOf[Eq[_]], "1") - checkFilterPredicate( - '_1 !== "1", classOf[NotEq[_]], (2 to 4).map(i => Row.apply(i.toString))) - - checkFilterPredicate('_1 < "2", classOf[Lt[_]], "1") - checkFilterPredicate('_1 > "3", classOf[Gt[_]], "4") - checkFilterPredicate('_1 <= "1", classOf[LtEq[_]], "1") - checkFilterPredicate('_1 >= "4", classOf[GtEq[_]], "4") - - checkFilterPredicate(Literal("1") === '_1, classOf[Eq[_]], "1") - checkFilterPredicate(Literal("2") > '_1, classOf[Lt[_]], "1") - checkFilterPredicate(Literal("3") < '_1, classOf[Gt[_]], "4") - checkFilterPredicate(Literal("1") >= '_1, classOf[LtEq[_]], "1") - checkFilterPredicate(Literal("4") <= '_1, classOf[GtEq[_]], "4") - - checkFilterPredicate(!('_1 < "4"), classOf[GtEq[_]], "4") - checkFilterPredicate('_1 > "2" && '_1 < "4", classOf[Operators.And], "3") - checkFilterPredicate('_1 < "2" || '_1 > "3", classOf[Operators.Or], Seq(Row("1"), Row("4"))) - } - } - - test("filter pushdown - binary") { - implicit class IntToBinary(int: Int) { - def b: Array[Byte] = int.toString.getBytes("UTF-8") - } - - withParquetDataFrame((1 to 4).map(i => Tuple1(i.b))) { implicit df => - checkBinaryFilterPredicate('_1 === 1.b, classOf[Eq[_]], 1.b) - - checkBinaryFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row]) - checkBinaryFilterPredicate( - '_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(i => Row.apply(i.b)).toSeq) - - checkBinaryFilterPredicate( - '_1 !== 1.b, classOf[NotEq[_]], (2 to 4).map(i => Row.apply(i.b)).toSeq) - - checkBinaryFilterPredicate('_1 < 2.b, classOf[Lt[_]], 1.b) - checkBinaryFilterPredicate('_1 > 3.b, classOf[Gt[_]], 4.b) - checkBinaryFilterPredicate('_1 <= 1.b, classOf[LtEq[_]], 1.b) - checkBinaryFilterPredicate('_1 >= 4.b, classOf[GtEq[_]], 4.b) - - checkBinaryFilterPredicate(Literal(1.b) === '_1, classOf[Eq[_]], 1.b) - checkBinaryFilterPredicate(Literal(2.b) > '_1, classOf[Lt[_]], 1.b) - checkBinaryFilterPredicate(Literal(3.b) < '_1, classOf[Gt[_]], 4.b) - checkBinaryFilterPredicate(Literal(1.b) >= '_1, classOf[LtEq[_]], 1.b) - checkBinaryFilterPredicate(Literal(4.b) <= '_1, classOf[GtEq[_]], 4.b) - - checkBinaryFilterPredicate(!('_1 < 4.b), classOf[GtEq[_]], 4.b) - checkBinaryFilterPredicate('_1 > 2.b && '_1 < 4.b, classOf[Operators.And], 3.b) - checkBinaryFilterPredicate( - '_1 < 2.b || '_1 > 3.b, classOf[Operators.Or], Seq(Row(1.b), Row(4.b))) - } - } - - test("SPARK-6554: don't push down predicates which reference partition columns") { - import sqlContext.implicits._ - - withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") { - withTempPath { dir => - val path = s"${dir.getCanonicalPath}/part=1" - (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path) - - // If the "part = 1" filter gets pushed down, this query will throw an exception since - // "part" is not a valid column in the actual Parquet file - checkAnswer( - sqlContext.read.parquet(path).filter("part = 1"), - (1 to 3).map(i => Row(i, i.toString, 1))) - } - } - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org