http://git-wip-us.apache.org/repos/asf/spark/blob/c1838e43/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala deleted file mode 100644 index b415da5..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala +++ /dev/null @@ -1,436 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.parquet - -import scala.collection.JavaConversions._ -import scala.reflect.ClassTag -import scala.reflect.runtime.universe.TypeTag - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} -import org.apache.parquet.example.data.simple.SimpleGroup -import org.apache.parquet.example.data.{Group, GroupWriter} -import org.apache.parquet.hadoop.api.WriteSupport -import org.apache.parquet.hadoop.api.WriteSupport.WriteContext -import org.apache.parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, ParquetMetadata} -import org.apache.parquet.hadoop.{Footer, ParquetFileWriter, ParquetOutputCommitter, ParquetWriter} -import org.apache.parquet.io.api.RecordConsumer -import org.apache.parquet.schema.{MessageType, MessageTypeParser} - -import org.apache.spark.SparkException -import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.ScalaReflection -import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.types._ - -// Write support class for nested groups: ParquetWriter initializes GroupWriteSupport -// with an empty configuration (it is after all not intended to be used in this way?) -// and members are private so we need to make our own in order to pass the schema -// to the writer. -private[parquet] class TestGroupWriteSupport(schema: MessageType) extends WriteSupport[Group] { - var groupWriter: GroupWriter = null - - override def prepareForWrite(recordConsumer: RecordConsumer): Unit = { - groupWriter = new GroupWriter(recordConsumer, schema) - } - - override def init(configuration: Configuration): WriteContext = { - new WriteContext(schema, new java.util.HashMap[String, String]()) - } - - override def write(record: Group) { - groupWriter.write(record) - } -} - -/** - * A test suite that tests basic Parquet I/O. - */ -class ParquetIOSuite extends QueryTest with ParquetTest { - lazy val sqlContext = org.apache.spark.sql.test.TestSQLContext - import sqlContext.implicits._ - - /** - * Writes `data` to a Parquet file, reads it back and check file contents. - */ - protected def checkParquetFile[T <: Product : ClassTag: TypeTag](data: Seq[T]): Unit = { - withParquetDataFrame(data)(r => checkAnswer(r, data.map(Row.fromTuple))) - } - - test("basic data types (without binary)") { - val data = (1 to 4).map { i => - (i % 2 == 0, i, i.toLong, i.toFloat, i.toDouble) - } - checkParquetFile(data) - } - - test("raw binary") { - val data = (1 to 4).map(i => Tuple1(Array.fill(3)(i.toByte))) - withParquetDataFrame(data) { df => - assertResult(data.map(_._1.mkString(",")).sorted) { - df.collect().map(_.getAs[Array[Byte]](0).mkString(",")).sorted - } - } - } - - test("string") { - val data = (1 to 4).map(i => Tuple1(i.toString)) - // Property spark.sql.parquet.binaryAsString shouldn't affect Parquet files written by Spark SQL - // as we store Spark SQL schema in the extra metadata. - withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> "false")(checkParquetFile(data)) - withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> "true")(checkParquetFile(data)) - } - - test("fixed-length decimals") { - def makeDecimalRDD(decimal: DecimalType): DataFrame = - sqlContext.sparkContext - .parallelize(0 to 1000) - .map(i => Tuple1(i / 100.0)) - .toDF() - // Parquet doesn't allow column names with spaces, have to add an alias here - .select($"_1" cast decimal as "dec") - - for ((precision, scale) <- Seq((5, 2), (1, 0), (1, 1), (18, 10), (18, 17), (19, 0), (38, 37))) { - withTempPath { dir => - val data = makeDecimalRDD(DecimalType(precision, scale)) - data.write.parquet(dir.getCanonicalPath) - checkAnswer(sqlContext.read.parquet(dir.getCanonicalPath), data.collect().toSeq) - } - } - } - - test("date type") { - def makeDateRDD(): DataFrame = - sqlContext.sparkContext - .parallelize(0 to 1000) - .map(i => Tuple1(DateTimeUtils.toJavaDate(i))) - .toDF() - .select($"_1") - - withTempPath { dir => - val data = makeDateRDD() - data.write.parquet(dir.getCanonicalPath) - checkAnswer(sqlContext.read.parquet(dir.getCanonicalPath), data.collect().toSeq) - } - } - - test("map") { - val data = (1 to 4).map(i => Tuple1(Map(i -> s"val_$i"))) - checkParquetFile(data) - } - - test("array") { - val data = (1 to 4).map(i => Tuple1(Seq(i, i + 1))) - checkParquetFile(data) - } - - test("array and double") { - val data = (1 to 4).map(i => (i.toDouble, Seq(i.toDouble, (i + 1).toDouble))) - checkParquetFile(data) - } - - test("struct") { - val data = (1 to 4).map(i => Tuple1((i, s"val_$i"))) - withParquetDataFrame(data) { df => - // Structs are converted to `Row`s - checkAnswer(df, data.map { case Tuple1(struct) => - Row(Row(struct.productIterator.toSeq: _*)) - }) - } - } - - test("nested struct with array of array as field") { - val data = (1 to 4).map(i => Tuple1((i, Seq(Seq(s"val_$i"))))) - withParquetDataFrame(data) { df => - // Structs are converted to `Row`s - checkAnswer(df, data.map { case Tuple1(struct) => - Row(Row(struct.productIterator.toSeq: _*)) - }) - } - } - - test("nested map with struct as value type") { - val data = (1 to 4).map(i => Tuple1(Map(i -> (i, s"val_$i")))) - withParquetDataFrame(data) { df => - checkAnswer(df, data.map { case Tuple1(m) => - Row(m.mapValues(struct => Row(struct.productIterator.toSeq: _*))) - }) - } - } - - test("nulls") { - val allNulls = ( - null.asInstanceOf[java.lang.Boolean], - null.asInstanceOf[Integer], - null.asInstanceOf[java.lang.Long], - null.asInstanceOf[java.lang.Float], - null.asInstanceOf[java.lang.Double]) - - withParquetDataFrame(allNulls :: Nil) { df => - val rows = df.collect() - assert(rows.length === 1) - assert(rows.head === Row(Seq.fill(5)(null): _*)) - } - } - - test("nones") { - val allNones = ( - None.asInstanceOf[Option[Int]], - None.asInstanceOf[Option[Long]], - None.asInstanceOf[Option[String]]) - - withParquetDataFrame(allNones :: Nil) { df => - val rows = df.collect() - assert(rows.length === 1) - assert(rows.head === Row(Seq.fill(3)(null): _*)) - } - } - - test("compression codec") { - def compressionCodecFor(path: String): String = { - val codecs = ParquetTypesConverter - .readMetaData(new Path(path), Some(configuration)) - .getBlocks - .flatMap(_.getColumns) - .map(_.getCodec.name()) - .distinct - - assert(codecs.size === 1) - codecs.head - } - - val data = (0 until 10).map(i => (i, i.toString)) - - def checkCompressionCodec(codec: CompressionCodecName): Unit = { - withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> codec.name()) { - withParquetFile(data) { path => - assertResult(sqlContext.conf.parquetCompressionCodec.toUpperCase) { - compressionCodecFor(path) - } - } - } - } - - // Checks default compression codec - checkCompressionCodec(CompressionCodecName.fromConf(sqlContext.conf.parquetCompressionCodec)) - - checkCompressionCodec(CompressionCodecName.UNCOMPRESSED) - checkCompressionCodec(CompressionCodecName.GZIP) - checkCompressionCodec(CompressionCodecName.SNAPPY) - } - - test("read raw Parquet file") { - def makeRawParquetFile(path: Path): Unit = { - val schema = MessageTypeParser.parseMessageType( - """ - |message root { - | required boolean _1; - | required int32 _2; - | required int64 _3; - | required float _4; - | required double _5; - |} - """.stripMargin) - - val writeSupport = new TestGroupWriteSupport(schema) - val writer = new ParquetWriter[Group](path, writeSupport) - - (0 until 10).foreach { i => - val record = new SimpleGroup(schema) - record.add(0, i % 2 == 0) - record.add(1, i) - record.add(2, i.toLong) - record.add(3, i.toFloat) - record.add(4, i.toDouble) - writer.write(record) - } - - writer.close() - } - - withTempDir { dir => - val path = new Path(dir.toURI.toString, "part-r-0.parquet") - makeRawParquetFile(path) - checkAnswer(sqlContext.read.parquet(path.toString), (0 until 10).map { i => - Row(i % 2 == 0, i, i.toLong, i.toFloat, i.toDouble) - }) - } - } - - test("write metadata") { - withTempPath { file => - val path = new Path(file.toURI.toString) - val fs = FileSystem.getLocal(configuration) - val attributes = ScalaReflection.attributesFor[(Int, String)] - ParquetTypesConverter.writeMetaData(attributes, path, configuration) - - assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_COMMON_METADATA_FILE))) - assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE))) - - val metaData = ParquetTypesConverter.readMetaData(path, Some(configuration)) - val actualSchema = metaData.getFileMetaData.getSchema - val expectedSchema = ParquetTypesConverter.convertFromAttributes(attributes) - - actualSchema.checkContains(expectedSchema) - expectedSchema.checkContains(actualSchema) - } - } - - test("save - overwrite") { - withParquetFile((1 to 10).map(i => (i, i.toString))) { file => - val newData = (11 to 20).map(i => (i, i.toString)) - newData.toDF().write.format("parquet").mode(SaveMode.Overwrite).save(file) - checkAnswer(sqlContext.read.parquet(file), newData.map(Row.fromTuple)) - } - } - - test("save - ignore") { - val data = (1 to 10).map(i => (i, i.toString)) - withParquetFile(data) { file => - val newData = (11 to 20).map(i => (i, i.toString)) - newData.toDF().write.format("parquet").mode(SaveMode.Ignore).save(file) - checkAnswer(sqlContext.read.parquet(file), data.map(Row.fromTuple)) - } - } - - test("save - throw") { - val data = (1 to 10).map(i => (i, i.toString)) - withParquetFile(data) { file => - val newData = (11 to 20).map(i => (i, i.toString)) - val errorMessage = intercept[Throwable] { - newData.toDF().write.format("parquet").mode(SaveMode.ErrorIfExists).save(file) - }.getMessage - assert(errorMessage.contains("already exists")) - } - } - - test("save - append") { - val data = (1 to 10).map(i => (i, i.toString)) - withParquetFile(data) { file => - val newData = (11 to 20).map(i => (i, i.toString)) - newData.toDF().write.format("parquet").mode(SaveMode.Append).save(file) - checkAnswer(sqlContext.read.parquet(file), (data ++ newData).map(Row.fromTuple)) - } - } - - test("SPARK-6315 regression test") { - // Spark 1.1 and prior versions write Spark schema as case class string into Parquet metadata. - // This has been deprecated by JSON format since 1.2. Notice that, 1.3 further refactored data - // types API, and made StructType.fields an array. This makes the result of StructType.toString - // different from prior versions: there's no "Seq" wrapping the fields part in the string now. - val sparkSchema = - "StructType(Seq(StructField(a,BooleanType,false),StructField(b,IntegerType,false)))" - - // The Parquet schema is intentionally made different from the Spark schema. Because the new - // Parquet data source simply falls back to the Parquet schema once it fails to parse the Spark - // schema. By making these two different, we are able to assert the old style case class string - // is parsed successfully. - val parquetSchema = MessageTypeParser.parseMessageType( - """message root { - | required int32 c; - |} - """.stripMargin) - - withTempPath { location => - val extraMetadata = Map(CatalystReadSupport.SPARK_METADATA_KEY -> sparkSchema.toString) - val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, "Spark") - val path = new Path(location.getCanonicalPath) - - ParquetFileWriter.writeMetadataFile( - sqlContext.sparkContext.hadoopConfiguration, - path, - new Footer(path, new ParquetMetadata(fileMetadata, Nil)) :: Nil) - - assertResult(sqlContext.read.parquet(path.toString).schema) { - StructType( - StructField("a", BooleanType, nullable = false) :: - StructField("b", IntegerType, nullable = false) :: - Nil) - } - } - } - - test("SPARK-6352 DirectParquetOutputCommitter") { - val clonedConf = new Configuration(configuration) - - // Write to a parquet file and let it fail. - // _temporary should be missing if direct output committer works. - try { - configuration.set("spark.sql.parquet.output.committer.class", - "org.apache.spark.sql.parquet.DirectParquetOutputCommitter") - sqlContext.udf.register("div0", (x: Int) => x / 0) - withTempPath { dir => - intercept[org.apache.spark.SparkException] { - sqlContext.sql("select div0(1)").write.parquet(dir.getCanonicalPath) - } - val path = new Path(dir.getCanonicalPath, "_temporary") - val fs = path.getFileSystem(configuration) - assert(!fs.exists(path)) - } - } finally { - // Hadoop 1 doesn't have `Configuration.unset` - configuration.clear() - clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue)) - } - } - - test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be overriden") { - withTempPath { dir => - val clonedConf = new Configuration(configuration) - - configuration.set( - SQLConf.OUTPUT_COMMITTER_CLASS.key, classOf[ParquetOutputCommitter].getCanonicalName) - - configuration.set( - "spark.sql.parquet.output.committer.class", - classOf[BogusParquetOutputCommitter].getCanonicalName) - - try { - val message = intercept[SparkException] { - sqlContext.range(0, 1).write.parquet(dir.getCanonicalPath) - }.getCause.getMessage - assert(message === "Intentional exception for testing purposes") - } finally { - // Hadoop 1 doesn't have `Configuration.unset` - configuration.clear() - clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue)) - } - } - } - - test("SPARK-6330 regression test") { - // In 1.3.0, save to fs other than file: without configuring core-site.xml would get: - // IllegalArgumentException: Wrong FS: hdfs://..., expected: file:/// - intercept[Throwable] { - sqlContext.read.parquet("file:///nonexistent") - } - val errorMessage = intercept[Throwable] { - sqlContext.read.parquet("hdfs://nonexistent") - }.toString - assert(errorMessage.contains("UnknownHostException")) - } -} - -class BogusParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext) - extends ParquetOutputCommitter(outputPath, context) { - - override def commitJob(jobContext: JobContext): Unit = { - sys.error("Intentional exception for testing purposes") - } -}
http://git-wip-us.apache.org/repos/asf/spark/blob/c1838e43/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala deleted file mode 100644 index 2eef101..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala +++ /dev/null @@ -1,602 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.parquet - -import java.io.File -import java.math.BigInteger -import java.sql.Timestamp - -import scala.collection.mutable.ArrayBuffer - -import com.google.common.io.Files -import org.apache.hadoop.fs.Path - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.execution.datasources.{LogicalRelation, PartitionSpec, Partition, PartitioningUtils} -import org.apache.spark.sql.types._ -import org.apache.spark.sql._ -import org.apache.spark.unsafe.types.UTF8String -import PartitioningUtils._ - -// The data where the partitioning key exists only in the directory structure. -case class ParquetData(intField: Int, stringField: String) - -// The data that also includes the partitioning key -case class ParquetDataWithKey(intField: Int, pi: Int, stringField: String, ps: String) - -class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest { - - override lazy val sqlContext: SQLContext = org.apache.spark.sql.test.TestSQLContext - import sqlContext.implicits._ - import sqlContext.sql - - val defaultPartitionName = "__HIVE_DEFAULT_PARTITION__" - - test("column type inference") { - def check(raw: String, literal: Literal): Unit = { - assert(inferPartitionColumnValue(raw, defaultPartitionName, true) === literal) - } - - check("10", Literal.create(10, IntegerType)) - check("1000000000000000", Literal.create(1000000000000000L, LongType)) - check("1.5", Literal.create(1.5, DoubleType)) - check("hello", Literal.create("hello", StringType)) - check(defaultPartitionName, Literal.create(null, NullType)) - } - - test("parse partition") { - def check(path: String, expected: Option[PartitionValues]): Unit = { - assert(expected === parsePartition(new Path(path), defaultPartitionName, true)) - } - - def checkThrows[T <: Throwable: Manifest](path: String, expected: String): Unit = { - val message = intercept[T] { - parsePartition(new Path(path), defaultPartitionName, true).get - }.getMessage - - assert(message.contains(expected)) - } - - check("file://path/a=10", Some { - PartitionValues( - ArrayBuffer("a"), - ArrayBuffer(Literal.create(10, IntegerType))) - }) - - check("file://path/a=10/b=hello/c=1.5", Some { - PartitionValues( - ArrayBuffer("a", "b", "c"), - ArrayBuffer( - Literal.create(10, IntegerType), - Literal.create("hello", StringType), - Literal.create(1.5, DoubleType))) - }) - - check("file://path/a=10/b_hello/c=1.5", Some { - PartitionValues( - ArrayBuffer("c"), - ArrayBuffer(Literal.create(1.5, DoubleType))) - }) - - check("file:///", None) - check("file:///path/_temporary", None) - check("file:///path/_temporary/c=1.5", None) - check("file:///path/_temporary/path", None) - check("file://path/a=10/_temporary/c=1.5", None) - check("file://path/a=10/c=1.5/_temporary", None) - - checkThrows[AssertionError]("file://path/=10", "Empty partition column name") - checkThrows[AssertionError]("file://path/a=", "Empty partition column value") - } - - test("parse partitions") { - def check(paths: Seq[String], spec: PartitionSpec): Unit = { - assert(parsePartitions(paths.map(new Path(_)), defaultPartitionName, true) === spec) - } - - check(Seq( - "hdfs://host:9000/path/a=10/b=hello"), - PartitionSpec( - StructType(Seq( - StructField("a", IntegerType), - StructField("b", StringType))), - Seq(Partition(InternalRow(10, UTF8String.fromString("hello")), - "hdfs://host:9000/path/a=10/b=hello")))) - - check(Seq( - "hdfs://host:9000/path/a=10/b=20", - "hdfs://host:9000/path/a=10.5/b=hello"), - PartitionSpec( - StructType(Seq( - StructField("a", DoubleType), - StructField("b", StringType))), - Seq( - Partition(InternalRow(10, UTF8String.fromString("20")), - "hdfs://host:9000/path/a=10/b=20"), - Partition(InternalRow(10.5, UTF8String.fromString("hello")), - "hdfs://host:9000/path/a=10.5/b=hello")))) - - check(Seq( - "hdfs://host:9000/path/_temporary", - "hdfs://host:9000/path/a=10/b=20", - "hdfs://host:9000/path/a=10.5/b=hello", - "hdfs://host:9000/path/a=10.5/_temporary", - "hdfs://host:9000/path/a=10.5/_TeMpOrArY", - "hdfs://host:9000/path/a=10.5/b=hello/_temporary", - "hdfs://host:9000/path/a=10.5/b=hello/_TEMPORARY", - "hdfs://host:9000/path/_temporary/path", - "hdfs://host:9000/path/a=11/_temporary/path", - "hdfs://host:9000/path/a=10.5/b=world/_temporary/path"), - PartitionSpec( - StructType(Seq( - StructField("a", DoubleType), - StructField("b", StringType))), - Seq( - Partition(InternalRow(10, UTF8String.fromString("20")), - "hdfs://host:9000/path/a=10/b=20"), - Partition(InternalRow(10.5, UTF8String.fromString("hello")), - "hdfs://host:9000/path/a=10.5/b=hello")))) - - check(Seq( - s"hdfs://host:9000/path/a=10/b=20", - s"hdfs://host:9000/path/a=$defaultPartitionName/b=hello"), - PartitionSpec( - StructType(Seq( - StructField("a", IntegerType), - StructField("b", StringType))), - Seq( - Partition(InternalRow(10, UTF8String.fromString("20")), - s"hdfs://host:9000/path/a=10/b=20"), - Partition(InternalRow(null, UTF8String.fromString("hello")), - s"hdfs://host:9000/path/a=$defaultPartitionName/b=hello")))) - - check(Seq( - s"hdfs://host:9000/path/a=10/b=$defaultPartitionName", - s"hdfs://host:9000/path/a=10.5/b=$defaultPartitionName"), - PartitionSpec( - StructType(Seq( - StructField("a", DoubleType), - StructField("b", StringType))), - Seq( - Partition(InternalRow(10, null), s"hdfs://host:9000/path/a=10/b=$defaultPartitionName"), - Partition(InternalRow(10.5, null), - s"hdfs://host:9000/path/a=10.5/b=$defaultPartitionName")))) - - check(Seq( - s"hdfs://host:9000/path1", - s"hdfs://host:9000/path2"), - PartitionSpec.emptySpec) - } - - test("parse partitions with type inference disabled") { - def check(paths: Seq[String], spec: PartitionSpec): Unit = { - assert(parsePartitions(paths.map(new Path(_)), defaultPartitionName, false) === spec) - } - - check(Seq( - "hdfs://host:9000/path/a=10/b=hello"), - PartitionSpec( - StructType(Seq( - StructField("a", StringType), - StructField("b", StringType))), - Seq(Partition(InternalRow(UTF8String.fromString("10"), UTF8String.fromString("hello")), - "hdfs://host:9000/path/a=10/b=hello")))) - - check(Seq( - "hdfs://host:9000/path/a=10/b=20", - "hdfs://host:9000/path/a=10.5/b=hello"), - PartitionSpec( - StructType(Seq( - StructField("a", StringType), - StructField("b", StringType))), - Seq( - Partition(InternalRow(UTF8String.fromString("10"), UTF8String.fromString("20")), - "hdfs://host:9000/path/a=10/b=20"), - Partition(InternalRow(UTF8String.fromString("10.5"), UTF8String.fromString("hello")), - "hdfs://host:9000/path/a=10.5/b=hello")))) - - check(Seq( - "hdfs://host:9000/path/_temporary", - "hdfs://host:9000/path/a=10/b=20", - "hdfs://host:9000/path/a=10.5/b=hello", - "hdfs://host:9000/path/a=10.5/_temporary", - "hdfs://host:9000/path/a=10.5/_TeMpOrArY", - "hdfs://host:9000/path/a=10.5/b=hello/_temporary", - "hdfs://host:9000/path/a=10.5/b=hello/_TEMPORARY", - "hdfs://host:9000/path/_temporary/path", - "hdfs://host:9000/path/a=11/_temporary/path", - "hdfs://host:9000/path/a=10.5/b=world/_temporary/path"), - PartitionSpec( - StructType(Seq( - StructField("a", StringType), - StructField("b", StringType))), - Seq( - Partition(InternalRow(UTF8String.fromString("10"), UTF8String.fromString("20")), - "hdfs://host:9000/path/a=10/b=20"), - Partition(InternalRow(UTF8String.fromString("10.5"), UTF8String.fromString("hello")), - "hdfs://host:9000/path/a=10.5/b=hello")))) - - check(Seq( - s"hdfs://host:9000/path/a=10/b=20", - s"hdfs://host:9000/path/a=$defaultPartitionName/b=hello"), - PartitionSpec( - StructType(Seq( - StructField("a", StringType), - StructField("b", StringType))), - Seq( - Partition(InternalRow(UTF8String.fromString("10"), UTF8String.fromString("20")), - s"hdfs://host:9000/path/a=10/b=20"), - Partition(InternalRow(null, UTF8String.fromString("hello")), - s"hdfs://host:9000/path/a=$defaultPartitionName/b=hello")))) - - check(Seq( - s"hdfs://host:9000/path/a=10/b=$defaultPartitionName", - s"hdfs://host:9000/path/a=10.5/b=$defaultPartitionName"), - PartitionSpec( - StructType(Seq( - StructField("a", StringType), - StructField("b", StringType))), - Seq( - Partition(InternalRow(UTF8String.fromString("10"), null), - s"hdfs://host:9000/path/a=10/b=$defaultPartitionName"), - Partition(InternalRow(UTF8String.fromString("10.5"), null), - s"hdfs://host:9000/path/a=10.5/b=$defaultPartitionName")))) - - check(Seq( - s"hdfs://host:9000/path1", - s"hdfs://host:9000/path2"), - PartitionSpec.emptySpec) - } - - test("read partitioned table - normal case") { - withTempDir { base => - for { - pi <- Seq(1, 2) - ps <- Seq("foo", "bar") - } { - val dir = makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps) - makeParquetFile( - (1 to 10).map(i => ParquetData(i, i.toString)), - dir) - // Introduce _temporary dir to test the robustness of the schema discovery process. - new File(dir.toString, "_temporary").mkdir() - } - // Introduce _temporary dir to the base dir the robustness of the schema discovery process. - new File(base.getCanonicalPath, "_temporary").mkdir() - - sqlContext.read.parquet(base.getCanonicalPath).registerTempTable("t") - - withTempTable("t") { - checkAnswer( - sql("SELECT * FROM t"), - for { - i <- 1 to 10 - pi <- Seq(1, 2) - ps <- Seq("foo", "bar") - } yield Row(i, i.toString, pi, ps)) - - checkAnswer( - sql("SELECT intField, pi FROM t"), - for { - i <- 1 to 10 - pi <- Seq(1, 2) - _ <- Seq("foo", "bar") - } yield Row(i, pi)) - - checkAnswer( - sql("SELECT * FROM t WHERE pi = 1"), - for { - i <- 1 to 10 - ps <- Seq("foo", "bar") - } yield Row(i, i.toString, 1, ps)) - - checkAnswer( - sql("SELECT * FROM t WHERE ps = 'foo'"), - for { - i <- 1 to 10 - pi <- Seq(1, 2) - } yield Row(i, i.toString, pi, "foo")) - } - } - } - - test("read partitioned table - partition key included in Parquet file") { - withTempDir { base => - for { - pi <- Seq(1, 2) - ps <- Seq("foo", "bar") - } { - makeParquetFile( - (1 to 10).map(i => ParquetDataWithKey(i, pi, i.toString, ps)), - makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps)) - } - - sqlContext.read.parquet(base.getCanonicalPath).registerTempTable("t") - - withTempTable("t") { - checkAnswer( - sql("SELECT * FROM t"), - for { - i <- 1 to 10 - pi <- Seq(1, 2) - ps <- Seq("foo", "bar") - } yield Row(i, pi, i.toString, ps)) - - checkAnswer( - sql("SELECT intField, pi FROM t"), - for { - i <- 1 to 10 - pi <- Seq(1, 2) - _ <- Seq("foo", "bar") - } yield Row(i, pi)) - - checkAnswer( - sql("SELECT * FROM t WHERE pi = 1"), - for { - i <- 1 to 10 - ps <- Seq("foo", "bar") - } yield Row(i, 1, i.toString, ps)) - - checkAnswer( - sql("SELECT * FROM t WHERE ps = 'foo'"), - for { - i <- 1 to 10 - pi <- Seq(1, 2) - } yield Row(i, pi, i.toString, "foo")) - } - } - } - - test("read partitioned table - with nulls") { - withTempDir { base => - for { - // Must be `Integer` rather than `Int` here. `null.asInstanceOf[Int]` results in a zero... - pi <- Seq(1, null.asInstanceOf[Integer]) - ps <- Seq("foo", null.asInstanceOf[String]) - } { - makeParquetFile( - (1 to 10).map(i => ParquetData(i, i.toString)), - makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps)) - } - - val parquetRelation = sqlContext.read.format("parquet").load(base.getCanonicalPath) - parquetRelation.registerTempTable("t") - - withTempTable("t") { - checkAnswer( - sql("SELECT * FROM t"), - for { - i <- 1 to 10 - pi <- Seq(1, null.asInstanceOf[Integer]) - ps <- Seq("foo", null.asInstanceOf[String]) - } yield Row(i, i.toString, pi, ps)) - - checkAnswer( - sql("SELECT * FROM t WHERE pi IS NULL"), - for { - i <- 1 to 10 - ps <- Seq("foo", null.asInstanceOf[String]) - } yield Row(i, i.toString, null, ps)) - - checkAnswer( - sql("SELECT * FROM t WHERE ps IS NULL"), - for { - i <- 1 to 10 - pi <- Seq(1, null.asInstanceOf[Integer]) - } yield Row(i, i.toString, pi, null)) - } - } - } - - test("read partitioned table - with nulls and partition keys are included in Parquet file") { - withTempDir { base => - for { - pi <- Seq(1, 2) - ps <- Seq("foo", null.asInstanceOf[String]) - } { - makeParquetFile( - (1 to 10).map(i => ParquetDataWithKey(i, pi, i.toString, ps)), - makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps)) - } - - val parquetRelation = sqlContext.read.format("parquet").load(base.getCanonicalPath) - parquetRelation.registerTempTable("t") - - withTempTable("t") { - checkAnswer( - sql("SELECT * FROM t"), - for { - i <- 1 to 10 - pi <- Seq(1, 2) - ps <- Seq("foo", null.asInstanceOf[String]) - } yield Row(i, pi, i.toString, ps)) - - checkAnswer( - sql("SELECT * FROM t WHERE ps IS NULL"), - for { - i <- 1 to 10 - pi <- Seq(1, 2) - } yield Row(i, pi, i.toString, null)) - } - } - } - - test("read partitioned table - merging compatible schemas") { - withTempDir { base => - makeParquetFile( - (1 to 10).map(i => Tuple1(i)).toDF("intField"), - makePartitionDir(base, defaultPartitionName, "pi" -> 1)) - - makeParquetFile( - (1 to 10).map(i => (i, i.toString)).toDF("intField", "stringField"), - makePartitionDir(base, defaultPartitionName, "pi" -> 2)) - - sqlContext - .read - .option("mergeSchema", "true") - .format("parquet") - .load(base.getCanonicalPath) - .registerTempTable("t") - - withTempTable("t") { - checkAnswer( - sql("SELECT * FROM t"), - (1 to 10).map(i => Row(i, null, 1)) ++ (1 to 10).map(i => Row(i, i.toString, 2))) - } - } - } - - test("SPARK-7749 Non-partitioned table should have empty partition spec") { - withTempPath { dir => - (1 to 10).map(i => (i, i.toString)).toDF("a", "b").write.parquet(dir.getCanonicalPath) - val queryExecution = sqlContext.read.parquet(dir.getCanonicalPath).queryExecution - queryExecution.analyzed.collectFirst { - case LogicalRelation(relation: ParquetRelation) => - assert(relation.partitionSpec === PartitionSpec.emptySpec) - }.getOrElse { - fail(s"Expecting a ParquetRelation2, but got:\n$queryExecution") - } - } - } - - test("SPARK-7847: Dynamic partition directory path escaping and unescaping") { - withTempPath { dir => - val df = Seq("/", "[]", "?").zipWithIndex.map(_.swap).toDF("i", "s") - df.write.format("parquet").partitionBy("s").save(dir.getCanonicalPath) - checkAnswer(sqlContext.read.parquet(dir.getCanonicalPath), df.collect()) - } - } - - test("Various partition value types") { - val row = - Row( - 100.toByte, - 40000.toShort, - Int.MaxValue, - Long.MaxValue, - 1.5.toFloat, - 4.5, - new java.math.BigDecimal(new BigInteger("212500"), 5), - new java.math.BigDecimal(2.125), - java.sql.Date.valueOf("2015-05-23"), - new Timestamp(0), - "This is a string, /[]?=:", - "This is not a partition column") - - // BooleanType is not supported yet - val partitionColumnTypes = - Seq( - ByteType, - ShortType, - IntegerType, - LongType, - FloatType, - DoubleType, - DecimalType(10, 5), - DecimalType.SYSTEM_DEFAULT, - DateType, - TimestampType, - StringType) - - val partitionColumns = partitionColumnTypes.zipWithIndex.map { - case (t, index) => StructField(s"p_$index", t) - } - - val schema = StructType(partitionColumns :+ StructField(s"i", StringType)) - val df = sqlContext.createDataFrame(sqlContext.sparkContext.parallelize(row :: Nil), schema) - - withTempPath { dir => - df.write.format("parquet").partitionBy(partitionColumns.map(_.name): _*).save(dir.toString) - val fields = schema.map(f => Column(f.name).cast(f.dataType)) - checkAnswer(sqlContext.read.load(dir.toString).select(fields: _*), row) - } - } - - test("SPARK-8037: Ignores files whose name starts with dot") { - withTempPath { dir => - val df = (1 to 3).map(i => (i, i, i, i)).toDF("a", "b", "c", "d") - - df.write - .format("parquet") - .partitionBy("b", "c", "d") - .save(dir.getCanonicalPath) - - Files.touch(new File(s"${dir.getCanonicalPath}/b=1", ".DS_Store")) - Files.createParentDirs(new File(s"${dir.getCanonicalPath}/b=1/c=1/.foo/bar")) - - checkAnswer(sqlContext.read.format("parquet").load(dir.getCanonicalPath), df) - } - } - - test("listConflictingPartitionColumns") { - def makeExpectedMessage(colNameLists: Seq[String], paths: Seq[String]): String = { - val conflictingColNameLists = colNameLists.zipWithIndex.map { case (list, index) => - s"\tPartition column name list #$index: $list" - }.mkString("\n", "\n", "\n") - - // scalastyle:off - s"""Conflicting partition column names detected: - |$conflictingColNameLists - |For partitioned table directories, data files should only live in leaf directories. - |And directories at the same level should have the same partition column name. - |Please check the following directories for unexpected files or inconsistent partition column names: - |${paths.map("\t" + _).mkString("\n", "\n", "")} - """.stripMargin.trim - // scalastyle:on - } - - assert( - listConflictingPartitionColumns( - Seq( - (new Path("file:/tmp/foo/a=1"), PartitionValues(Seq("a"), Seq(Literal(1)))), - (new Path("file:/tmp/foo/b=1"), PartitionValues(Seq("b"), Seq(Literal(1)))))).trim === - makeExpectedMessage(Seq("a", "b"), Seq("file:/tmp/foo/a=1", "file:/tmp/foo/b=1"))) - - assert( - listConflictingPartitionColumns( - Seq( - (new Path("file:/tmp/foo/a=1/_temporary"), PartitionValues(Seq("a"), Seq(Literal(1)))), - (new Path("file:/tmp/foo/a=1"), PartitionValues(Seq("a"), Seq(Literal(1)))))).trim === - makeExpectedMessage( - Seq("a"), - Seq("file:/tmp/foo/a=1/_temporary", "file:/tmp/foo/a=1"))) - - assert( - listConflictingPartitionColumns( - Seq( - (new Path("file:/tmp/foo/a=1"), - PartitionValues(Seq("a"), Seq(Literal(1)))), - (new Path("file:/tmp/foo/a=1/b=foo"), - PartitionValues(Seq("a", "b"), Seq(Literal(1), Literal("foo")))))).trim === - makeExpectedMessage( - Seq("a", "a, b"), - Seq("file:/tmp/foo/a=1", "file:/tmp/foo/a=1/b=foo"))) - } - - test("Parallel partition discovery") { - withTempPath { dir => - withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "1") { - val path = dir.getCanonicalPath - val df = sqlContext.range(5).select('id as 'a, 'id as 'b, 'id as 'c).coalesce(1) - df.write.partitionBy("b", "c").parquet(path) - checkAnswer(sqlContext.read.parquet(path), df) - } - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/c1838e43/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala deleted file mode 100644 index 5c65a8e..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ /dev/null @@ -1,205 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.parquet - -import java.io.File - -import org.apache.hadoop.fs.Path - -import org.apache.spark.sql.types._ -import org.apache.spark.sql.{QueryTest, Row, SQLConf} -import org.apache.spark.util.Utils - -/** - * A test suite that tests various Parquet queries. - */ -class ParquetQuerySuite extends QueryTest with ParquetTest { - lazy val sqlContext = org.apache.spark.sql.test.TestSQLContext - import sqlContext.sql - - test("simple select queries") { - withParquetTable((0 until 10).map(i => (i, i.toString)), "t") { - checkAnswer(sql("SELECT _1 FROM t where t._1 > 5"), (6 until 10).map(Row.apply(_))) - checkAnswer(sql("SELECT _1 FROM t as tmp where tmp._1 < 5"), (0 until 5).map(Row.apply(_))) - } - } - - test("appending") { - val data = (0 until 10).map(i => (i, i.toString)) - sqlContext.createDataFrame(data).toDF("c1", "c2").registerTempTable("tmp") - withParquetTable(data, "t") { - sql("INSERT INTO TABLE t SELECT * FROM tmp") - checkAnswer(sqlContext.table("t"), (data ++ data).map(Row.fromTuple)) - } - sqlContext.catalog.unregisterTable(Seq("tmp")) - } - - test("overwriting") { - val data = (0 until 10).map(i => (i, i.toString)) - sqlContext.createDataFrame(data).toDF("c1", "c2").registerTempTable("tmp") - withParquetTable(data, "t") { - sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp") - checkAnswer(sqlContext.table("t"), data.map(Row.fromTuple)) - } - sqlContext.catalog.unregisterTable(Seq("tmp")) - } - - test("self-join") { - // 4 rows, cells of column 1 of row 2 and row 4 are null - val data = (1 to 4).map { i => - val maybeInt = if (i % 2 == 0) None else Some(i) - (maybeInt, i.toString) - } - - withParquetTable(data, "t") { - val selfJoin = sql("SELECT * FROM t x JOIN t y WHERE x._1 = y._1") - val queryOutput = selfJoin.queryExecution.analyzed.output - - assertResult(4, "Field count mismatches")(queryOutput.size) - assertResult(2, "Duplicated expression ID in query plan:\n $selfJoin") { - queryOutput.filter(_.name == "_1").map(_.exprId).size - } - - checkAnswer(selfJoin, List(Row(1, "1", 1, "1"), Row(3, "3", 3, "3"))) - } - } - - test("nested data - struct with array field") { - val data = (1 to 10).map(i => Tuple1((i, Seq("val_$i")))) - withParquetTable(data, "t") { - checkAnswer(sql("SELECT _1._2[0] FROM t"), data.map { - case Tuple1((_, Seq(string))) => Row(string) - }) - } - } - - test("nested data - array of struct") { - val data = (1 to 10).map(i => Tuple1(Seq(i -> "val_$i"))) - withParquetTable(data, "t") { - checkAnswer(sql("SELECT _1[0]._2 FROM t"), data.map { - case Tuple1(Seq((_, string))) => Row(string) - }) - } - } - - test("SPARK-1913 regression: columns only referenced by pushed down filters should remain") { - withParquetTable((1 to 10).map(Tuple1.apply), "t") { - checkAnswer(sql("SELECT _1 FROM t WHERE _1 < 10"), (1 to 9).map(Row.apply(_))) - } - } - - test("SPARK-5309 strings stored using dictionary compression in parquet") { - withParquetTable((0 until 1000).map(i => ("same", "run_" + i /100, 1)), "t") { - - checkAnswer(sql("SELECT _1, _2, SUM(_3) FROM t GROUP BY _1, _2"), - (0 until 10).map(i => Row("same", "run_" + i, 100))) - - checkAnswer(sql("SELECT _1, _2, SUM(_3) FROM t WHERE _2 = 'run_5' GROUP BY _1, _2"), - List(Row("same", "run_5", 100))) - } - } - - test("SPARK-6917 DecimalType should work with non-native types") { - val data = (1 to 10).map(i => Row(Decimal(i, 18, 0), new java.sql.Timestamp(i))) - val schema = StructType(List(StructField("d", DecimalType(18, 0), false), - StructField("time", TimestampType, false)).toArray) - withTempPath { file => - val df = sqlContext.createDataFrame(sqlContext.sparkContext.parallelize(data), schema) - df.write.parquet(file.getCanonicalPath) - val df2 = sqlContext.read.parquet(file.getCanonicalPath) - checkAnswer(df2, df.collect().toSeq) - } - } - - test("Enabling/disabling merging partfiles when merging parquet schema") { - def testSchemaMerging(expectedColumnNumber: Int): Unit = { - withTempDir { dir => - val basePath = dir.getCanonicalPath - sqlContext.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString) - sqlContext.range(0, 10).toDF("b").write.parquet(new Path(basePath, "foo=2").toString) - // delete summary files, so if we don't merge part-files, one column will not be included. - Utils.deleteRecursively(new File(basePath + "/foo=1/_metadata")) - Utils.deleteRecursively(new File(basePath + "/foo=1/_common_metadata")) - assert(sqlContext.read.parquet(basePath).columns.length === expectedColumnNumber) - } - } - - withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true", - SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "true") { - testSchemaMerging(2) - } - - withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true", - SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "false") { - testSchemaMerging(3) - } - } - - test("Enabling/disabling schema merging") { - def testSchemaMerging(expectedColumnNumber: Int): Unit = { - withTempDir { dir => - val basePath = dir.getCanonicalPath - sqlContext.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString) - sqlContext.range(0, 10).toDF("b").write.parquet(new Path(basePath, "foo=2").toString) - assert(sqlContext.read.parquet(basePath).columns.length === expectedColumnNumber) - } - } - - withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") { - testSchemaMerging(3) - } - - withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "false") { - testSchemaMerging(2) - } - } - - test("SPARK-8990 DataFrameReader.parquet() should respect user specified options") { - withTempPath { dir => - val basePath = dir.getCanonicalPath - sqlContext.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString) - sqlContext.range(0, 10).toDF("b").write.parquet(new Path(basePath, "foo=a").toString) - - // Disables the global SQL option for schema merging - withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "false") { - assertResult(2) { - // Disables schema merging via data source option - sqlContext.read.option("mergeSchema", "false").parquet(basePath).columns.length - } - - assertResult(3) { - // Enables schema merging via data source option - sqlContext.read.option("mergeSchema", "true").parquet(basePath).columns.length - } - } - } - } - - test("SPARK-9119 Decimal should be correctly written into parquet") { - withTempPath { dir => - val basePath = dir.getCanonicalPath - val schema = StructType(Array(StructField("name", DecimalType(10, 5), false))) - val rowRDD = sqlContext.sparkContext.parallelize(Array(Row(Decimal("67123.45")))) - val df = sqlContext.createDataFrame(rowRDD, schema) - df.write.parquet(basePath) - - val decimal = sqlContext.read.parquet(basePath).first().getDecimal(0) - assert(Decimal("67123.45") === Decimal(decimal)) - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/c1838e43/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala deleted file mode 100644 index 4a0b3b6..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala +++ /dev/null @@ -1,916 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.parquet - -import scala.reflect.ClassTag -import scala.reflect.runtime.universe.TypeTag - -import org.apache.parquet.schema.MessageTypeParser - -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.ScalaReflection -import org.apache.spark.sql.test.TestSQLContext -import org.apache.spark.sql.types._ - -abstract class ParquetSchemaTest extends SparkFunSuite with ParquetTest { - val sqlContext = TestSQLContext - - /** - * Checks whether the reflected Parquet message type for product type `T` conforms `messageType`. - */ - protected def testSchemaInference[T <: Product: ClassTag: TypeTag]( - testName: String, - messageType: String, - binaryAsString: Boolean = true, - int96AsTimestamp: Boolean = true, - followParquetFormatSpec: Boolean = false, - isThriftDerived: Boolean = false): Unit = { - testSchema( - testName, - StructType.fromAttributes(ScalaReflection.attributesFor[T]), - messageType, - binaryAsString, - int96AsTimestamp, - followParquetFormatSpec, - isThriftDerived) - } - - protected def testParquetToCatalyst( - testName: String, - sqlSchema: StructType, - parquetSchema: String, - binaryAsString: Boolean = true, - int96AsTimestamp: Boolean = true, - followParquetFormatSpec: Boolean = false, - isThriftDerived: Boolean = false): Unit = { - val converter = new CatalystSchemaConverter( - assumeBinaryIsString = binaryAsString, - assumeInt96IsTimestamp = int96AsTimestamp, - followParquetFormatSpec = followParquetFormatSpec) - - test(s"sql <= parquet: $testName") { - val actual = converter.convert(MessageTypeParser.parseMessageType(parquetSchema)) - val expected = sqlSchema - assert( - actual === expected, - s"""Schema mismatch. - |Expected schema: ${expected.json} - |Actual schema: ${actual.json} - """.stripMargin) - } - } - - protected def testCatalystToParquet( - testName: String, - sqlSchema: StructType, - parquetSchema: String, - binaryAsString: Boolean = true, - int96AsTimestamp: Boolean = true, - followParquetFormatSpec: Boolean = false, - isThriftDerived: Boolean = false): Unit = { - val converter = new CatalystSchemaConverter( - assumeBinaryIsString = binaryAsString, - assumeInt96IsTimestamp = int96AsTimestamp, - followParquetFormatSpec = followParquetFormatSpec) - - test(s"sql => parquet: $testName") { - val actual = converter.convert(sqlSchema) - val expected = MessageTypeParser.parseMessageType(parquetSchema) - actual.checkContains(expected) - expected.checkContains(actual) - } - } - - protected def testSchema( - testName: String, - sqlSchema: StructType, - parquetSchema: String, - binaryAsString: Boolean = true, - int96AsTimestamp: Boolean = true, - followParquetFormatSpec: Boolean = false, - isThriftDerived: Boolean = false): Unit = { - - testCatalystToParquet( - testName, - sqlSchema, - parquetSchema, - binaryAsString, - int96AsTimestamp, - followParquetFormatSpec, - isThriftDerived) - - testParquetToCatalyst( - testName, - sqlSchema, - parquetSchema, - binaryAsString, - int96AsTimestamp, - followParquetFormatSpec, - isThriftDerived) - } -} - -class ParquetSchemaInferenceSuite extends ParquetSchemaTest { - testSchemaInference[(Boolean, Int, Long, Float, Double, Array[Byte])]( - "basic types", - """ - |message root { - | required boolean _1; - | required int32 _2; - | required int64 _3; - | required float _4; - | required double _5; - | optional binary _6; - |} - """.stripMargin, - binaryAsString = false) - - testSchemaInference[(Byte, Short, Int, Long, java.sql.Date)]( - "logical integral types", - """ - |message root { - | required int32 _1 (INT_8); - | required int32 _2 (INT_16); - | required int32 _3 (INT_32); - | required int64 _4 (INT_64); - | optional int32 _5 (DATE); - |} - """.stripMargin) - - testSchemaInference[Tuple1[String]]( - "string", - """ - |message root { - | optional binary _1 (UTF8); - |} - """.stripMargin, - binaryAsString = true) - - testSchemaInference[Tuple1[String]]( - "binary enum as string", - """ - |message root { - | optional binary _1 (ENUM); - |} - """.stripMargin) - - testSchemaInference[Tuple1[Seq[Int]]]( - "non-nullable array - non-standard", - """ - |message root { - | optional group _1 (LIST) { - | repeated int32 array; - | } - |} - """.stripMargin) - - testSchemaInference[Tuple1[Seq[Int]]]( - "non-nullable array - standard", - """ - |message root { - | optional group _1 (LIST) { - | repeated group list { - | required int32 element; - | } - | } - |} - """.stripMargin, - followParquetFormatSpec = true) - - testSchemaInference[Tuple1[Seq[Integer]]]( - "nullable array - non-standard", - """ - |message root { - | optional group _1 (LIST) { - | repeated group bag { - | optional int32 array_element; - | } - | } - |} - """.stripMargin) - - testSchemaInference[Tuple1[Seq[Integer]]]( - "nullable array - standard", - """ - |message root { - | optional group _1 (LIST) { - | repeated group list { - | optional int32 element; - | } - | } - |} - """.stripMargin, - followParquetFormatSpec = true) - - testSchemaInference[Tuple1[Map[Int, String]]]( - "map - standard", - """ - |message root { - | optional group _1 (MAP) { - | repeated group key_value { - | required int32 key; - | optional binary value (UTF8); - | } - | } - |} - """.stripMargin, - followParquetFormatSpec = true) - - testSchemaInference[Tuple1[Map[Int, String]]]( - "map - non-standard", - """ - |message root { - | optional group _1 (MAP) { - | repeated group map (MAP_KEY_VALUE) { - | required int32 key; - | optional binary value (UTF8); - | } - | } - |} - """.stripMargin) - - testSchemaInference[Tuple1[Pair[Int, String]]]( - "struct", - """ - |message root { - | optional group _1 { - | required int32 _1; - | optional binary _2 (UTF8); - | } - |} - """.stripMargin, - followParquetFormatSpec = true) - - testSchemaInference[Tuple1[Map[Int, (String, Seq[(Int, Double)])]]]( - "deeply nested type - non-standard", - """ - |message root { - | optional group _1 (MAP_KEY_VALUE) { - | repeated group map { - | required int32 key; - | optional group value { - | optional binary _1 (UTF8); - | optional group _2 (LIST) { - | repeated group bag { - | optional group array_element { - | required int32 _1; - | required double _2; - | } - | } - | } - | } - | } - | } - |} - """.stripMargin) - - testSchemaInference[Tuple1[Map[Int, (String, Seq[(Int, Double)])]]]( - "deeply nested type - standard", - """ - |message root { - | optional group _1 (MAP) { - | repeated group key_value { - | required int32 key; - | optional group value { - | optional binary _1 (UTF8); - | optional group _2 (LIST) { - | repeated group list { - | optional group element { - | required int32 _1; - | required double _2; - | } - | } - | } - | } - | } - | } - |} - """.stripMargin, - followParquetFormatSpec = true) - - testSchemaInference[(Option[Int], Map[Int, Option[Double]])]( - "optional types", - """ - |message root { - | optional int32 _1; - | optional group _2 (MAP) { - | repeated group key_value { - | required int32 key; - | optional double value; - | } - | } - |} - """.stripMargin, - followParquetFormatSpec = true) - - // Parquet files generated by parquet-thrift are already handled by the schema converter, but - // let's leave this test here until both read path and write path are all updated. - ignore("thrift generated parquet schema") { - // Test for SPARK-4520 -- ensure that thrift generated parquet schema is generated - // as expected from attributes - testSchemaInference[( - Array[Byte], Array[Byte], Array[Byte], Seq[Int], Map[Array[Byte], Seq[Int]])]( - "thrift generated parquet schema", - """ - |message root { - | optional binary _1 (UTF8); - | optional binary _2 (UTF8); - | optional binary _3 (UTF8); - | optional group _4 (LIST) { - | repeated int32 _4_tuple; - | } - | optional group _5 (MAP) { - | repeated group map (MAP_KEY_VALUE) { - | required binary key (UTF8); - | optional group value (LIST) { - | repeated int32 value_tuple; - | } - | } - | } - |} - """.stripMargin, - isThriftDerived = true) - } -} - -class ParquetSchemaSuite extends ParquetSchemaTest { - test("DataType string parser compatibility") { - // This is the generated string from previous versions of the Spark SQL, using the following: - // val schema = StructType(List( - // StructField("c1", IntegerType, false), - // StructField("c2", BinaryType, true))) - val caseClassString = - "StructType(List(StructField(c1,IntegerType,false), StructField(c2,BinaryType,true)))" - - // scalastyle:off - val jsonString = """{"type":"struct","fields":[{"name":"c1","type":"integer","nullable":false,"metadata":{}},{"name":"c2","type":"binary","nullable":true,"metadata":{}}]}""" - // scalastyle:on - - val fromCaseClassString = ParquetTypesConverter.convertFromString(caseClassString) - val fromJson = ParquetTypesConverter.convertFromString(jsonString) - - (fromCaseClassString, fromJson).zipped.foreach { (a, b) => - assert(a.name == b.name) - assert(a.dataType === b.dataType) - assert(a.nullable === b.nullable) - } - } - - test("merge with metastore schema") { - // Field type conflict resolution - assertResult( - StructType(Seq( - StructField("lowerCase", StringType), - StructField("UPPERCase", DoubleType, nullable = false)))) { - - ParquetRelation.mergeMetastoreParquetSchema( - StructType(Seq( - StructField("lowercase", StringType), - StructField("uppercase", DoubleType, nullable = false))), - - StructType(Seq( - StructField("lowerCase", BinaryType), - StructField("UPPERCase", IntegerType, nullable = true)))) - } - - // MetaStore schema is subset of parquet schema - assertResult( - StructType(Seq( - StructField("UPPERCase", DoubleType, nullable = false)))) { - - ParquetRelation.mergeMetastoreParquetSchema( - StructType(Seq( - StructField("uppercase", DoubleType, nullable = false))), - - StructType(Seq( - StructField("lowerCase", BinaryType), - StructField("UPPERCase", IntegerType, nullable = true)))) - } - - // Metastore schema contains additional non-nullable fields. - assert(intercept[Throwable] { - ParquetRelation.mergeMetastoreParquetSchema( - StructType(Seq( - StructField("uppercase", DoubleType, nullable = false), - StructField("lowerCase", BinaryType, nullable = false))), - - StructType(Seq( - StructField("UPPERCase", IntegerType, nullable = true)))) - }.getMessage.contains("detected conflicting schemas")) - - // Conflicting non-nullable field names - intercept[Throwable] { - ParquetRelation.mergeMetastoreParquetSchema( - StructType(Seq(StructField("lower", StringType, nullable = false))), - StructType(Seq(StructField("lowerCase", BinaryType)))) - } - } - - test("merge missing nullable fields from Metastore schema") { - // Standard case: Metastore schema contains additional nullable fields not present - // in the Parquet file schema. - assertResult( - StructType(Seq( - StructField("firstField", StringType, nullable = true), - StructField("secondField", StringType, nullable = true), - StructField("thirdfield", StringType, nullable = true)))) { - ParquetRelation.mergeMetastoreParquetSchema( - StructType(Seq( - StructField("firstfield", StringType, nullable = true), - StructField("secondfield", StringType, nullable = true), - StructField("thirdfield", StringType, nullable = true))), - StructType(Seq( - StructField("firstField", StringType, nullable = true), - StructField("secondField", StringType, nullable = true)))) - } - - // Merge should fail if the Metastore contains any additional fields that are not - // nullable. - assert(intercept[Throwable] { - ParquetRelation.mergeMetastoreParquetSchema( - StructType(Seq( - StructField("firstfield", StringType, nullable = true), - StructField("secondfield", StringType, nullable = true), - StructField("thirdfield", StringType, nullable = false))), - StructType(Seq( - StructField("firstField", StringType, nullable = true), - StructField("secondField", StringType, nullable = true)))) - }.getMessage.contains("detected conflicting schemas")) - } - - // ======================================================= - // Tests for converting Parquet LIST to Catalyst ArrayType - // ======================================================= - - testParquetToCatalyst( - "Backwards-compatibility: LIST with nullable element type - 1 - standard", - StructType(Seq( - StructField( - "f1", - ArrayType(IntegerType, containsNull = true), - nullable = true))), - """message root { - | optional group f1 (LIST) { - | repeated group list { - | optional int32 element; - | } - | } - |} - """.stripMargin) - - testParquetToCatalyst( - "Backwards-compatibility: LIST with nullable element type - 2", - StructType(Seq( - StructField( - "f1", - ArrayType(IntegerType, containsNull = true), - nullable = true))), - """message root { - | optional group f1 (LIST) { - | repeated group element { - | optional int32 num; - | } - | } - |} - """.stripMargin) - - testParquetToCatalyst( - "Backwards-compatibility: LIST with non-nullable element type - 1 - standard", - StructType(Seq( - StructField("f1", ArrayType(IntegerType, containsNull = false), nullable = true))), - """message root { - | optional group f1 (LIST) { - | repeated group list { - | required int32 element; - | } - | } - |} - """.stripMargin) - - testParquetToCatalyst( - "Backwards-compatibility: LIST with non-nullable element type - 2", - StructType(Seq( - StructField("f1", ArrayType(IntegerType, containsNull = false), nullable = true))), - """message root { - | optional group f1 (LIST) { - | repeated group element { - | required int32 num; - | } - | } - |} - """.stripMargin) - - testParquetToCatalyst( - "Backwards-compatibility: LIST with non-nullable element type - 3", - StructType(Seq( - StructField("f1", ArrayType(IntegerType, containsNull = false), nullable = true))), - """message root { - | optional group f1 (LIST) { - | repeated int32 element; - | } - |} - """.stripMargin) - - testParquetToCatalyst( - "Backwards-compatibility: LIST with non-nullable element type - 4", - StructType(Seq( - StructField( - "f1", - ArrayType( - StructType(Seq( - StructField("str", StringType, nullable = false), - StructField("num", IntegerType, nullable = false))), - containsNull = false), - nullable = true))), - """message root { - | optional group f1 (LIST) { - | repeated group element { - | required binary str (UTF8); - | required int32 num; - | } - | } - |} - """.stripMargin) - - testParquetToCatalyst( - "Backwards-compatibility: LIST with non-nullable element type - 5 - parquet-avro style", - StructType(Seq( - StructField( - "f1", - ArrayType( - StructType(Seq( - StructField("str", StringType, nullable = false))), - containsNull = false), - nullable = true))), - """message root { - | optional group f1 (LIST) { - | repeated group array { - | required binary str (UTF8); - | } - | } - |} - """.stripMargin) - - testParquetToCatalyst( - "Backwards-compatibility: LIST with non-nullable element type - 6 - parquet-thrift style", - StructType(Seq( - StructField( - "f1", - ArrayType( - StructType(Seq( - StructField("str", StringType, nullable = false))), - containsNull = false), - nullable = true))), - """message root { - | optional group f1 (LIST) { - | repeated group f1_tuple { - | required binary str (UTF8); - | } - | } - |} - """.stripMargin) - - // ======================================================= - // Tests for converting Catalyst ArrayType to Parquet LIST - // ======================================================= - - testCatalystToParquet( - "Backwards-compatibility: LIST with nullable element type - 1 - standard", - StructType(Seq( - StructField( - "f1", - ArrayType(IntegerType, containsNull = true), - nullable = true))), - """message root { - | optional group f1 (LIST) { - | repeated group list { - | optional int32 element; - | } - | } - |} - """.stripMargin, - followParquetFormatSpec = true) - - testCatalystToParquet( - "Backwards-compatibility: LIST with nullable element type - 2 - prior to 1.4.x", - StructType(Seq( - StructField( - "f1", - ArrayType(IntegerType, containsNull = true), - nullable = true))), - """message root { - | optional group f1 (LIST) { - | repeated group bag { - | optional int32 array_element; - | } - | } - |} - """.stripMargin) - - testCatalystToParquet( - "Backwards-compatibility: LIST with non-nullable element type - 1 - standard", - StructType(Seq( - StructField( - "f1", - ArrayType(IntegerType, containsNull = false), - nullable = true))), - """message root { - | optional group f1 (LIST) { - | repeated group list { - | required int32 element; - | } - | } - |} - """.stripMargin, - followParquetFormatSpec = true) - - testCatalystToParquet( - "Backwards-compatibility: LIST with non-nullable element type - 2 - prior to 1.4.x", - StructType(Seq( - StructField( - "f1", - ArrayType(IntegerType, containsNull = false), - nullable = true))), - """message root { - | optional group f1 (LIST) { - | repeated int32 array; - | } - |} - """.stripMargin) - - // ==================================================== - // Tests for converting Parquet Map to Catalyst MapType - // ==================================================== - - testParquetToCatalyst( - "Backwards-compatibility: MAP with non-nullable value type - 1 - standard", - StructType(Seq( - StructField( - "f1", - MapType(IntegerType, StringType, valueContainsNull = false), - nullable = true))), - """message root { - | optional group f1 (MAP) { - | repeated group key_value { - | required int32 key; - | required binary value (UTF8); - | } - | } - |} - """.stripMargin) - - testParquetToCatalyst( - "Backwards-compatibility: MAP with non-nullable value type - 2", - StructType(Seq( - StructField( - "f1", - MapType(IntegerType, StringType, valueContainsNull = false), - nullable = true))), - """message root { - | optional group f1 (MAP_KEY_VALUE) { - | repeated group map { - | required int32 num; - | required binary str (UTF8); - | } - | } - |} - """.stripMargin) - - testParquetToCatalyst( - "Backwards-compatibility: MAP with non-nullable value type - 3 - prior to 1.4.x", - StructType(Seq( - StructField( - "f1", - MapType(IntegerType, StringType, valueContainsNull = false), - nullable = true))), - """message root { - | optional group f1 (MAP) { - | repeated group map (MAP_KEY_VALUE) { - | required int32 key; - | required binary value (UTF8); - | } - | } - |} - """.stripMargin) - - testParquetToCatalyst( - "Backwards-compatibility: MAP with nullable value type - 1 - standard", - StructType(Seq( - StructField( - "f1", - MapType(IntegerType, StringType, valueContainsNull = true), - nullable = true))), - """message root { - | optional group f1 (MAP) { - | repeated group key_value { - | required int32 key; - | optional binary value (UTF8); - | } - | } - |} - """.stripMargin) - - testParquetToCatalyst( - "Backwards-compatibility: MAP with nullable value type - 2", - StructType(Seq( - StructField( - "f1", - MapType(IntegerType, StringType, valueContainsNull = true), - nullable = true))), - """message root { - | optional group f1 (MAP_KEY_VALUE) { - | repeated group map { - | required int32 num; - | optional binary str (UTF8); - | } - | } - |} - """.stripMargin) - - testParquetToCatalyst( - "Backwards-compatibility: MAP with nullable value type - 3 - parquet-avro style", - StructType(Seq( - StructField( - "f1", - MapType(IntegerType, StringType, valueContainsNull = true), - nullable = true))), - """message root { - | optional group f1 (MAP) { - | repeated group map (MAP_KEY_VALUE) { - | required int32 key; - | optional binary value (UTF8); - | } - | } - |} - """.stripMargin) - - // ==================================================== - // Tests for converting Catalyst MapType to Parquet Map - // ==================================================== - - testCatalystToParquet( - "Backwards-compatibility: MAP with non-nullable value type - 1 - standard", - StructType(Seq( - StructField( - "f1", - MapType(IntegerType, StringType, valueContainsNull = false), - nullable = true))), - """message root { - | optional group f1 (MAP) { - | repeated group key_value { - | required int32 key; - | required binary value (UTF8); - | } - | } - |} - """.stripMargin, - followParquetFormatSpec = true) - - testCatalystToParquet( - "Backwards-compatibility: MAP with non-nullable value type - 2 - prior to 1.4.x", - StructType(Seq( - StructField( - "f1", - MapType(IntegerType, StringType, valueContainsNull = false), - nullable = true))), - """message root { - | optional group f1 (MAP) { - | repeated group map (MAP_KEY_VALUE) { - | required int32 key; - | required binary value (UTF8); - | } - | } - |} - """.stripMargin) - - testCatalystToParquet( - "Backwards-compatibility: MAP with nullable value type - 1 - standard", - StructType(Seq( - StructField( - "f1", - MapType(IntegerType, StringType, valueContainsNull = true), - nullable = true))), - """message root { - | optional group f1 (MAP) { - | repeated group key_value { - | required int32 key; - | optional binary value (UTF8); - | } - | } - |} - """.stripMargin, - followParquetFormatSpec = true) - - testCatalystToParquet( - "Backwards-compatibility: MAP with nullable value type - 3 - prior to 1.4.x", - StructType(Seq( - StructField( - "f1", - MapType(IntegerType, StringType, valueContainsNull = true), - nullable = true))), - """message root { - | optional group f1 (MAP) { - | repeated group map (MAP_KEY_VALUE) { - | required int32 key; - | optional binary value (UTF8); - | } - | } - |} - """.stripMargin) - - // ================================= - // Tests for conversion for decimals - // ================================= - - testSchema( - "DECIMAL(1, 0) - standard", - StructType(Seq(StructField("f1", DecimalType(1, 0)))), - """message root { - | optional int32 f1 (DECIMAL(1, 0)); - |} - """.stripMargin, - followParquetFormatSpec = true) - - testSchema( - "DECIMAL(8, 3) - standard", - StructType(Seq(StructField("f1", DecimalType(8, 3)))), - """message root { - | optional int32 f1 (DECIMAL(8, 3)); - |} - """.stripMargin, - followParquetFormatSpec = true) - - testSchema( - "DECIMAL(9, 3) - standard", - StructType(Seq(StructField("f1", DecimalType(9, 3)))), - """message root { - | optional int32 f1 (DECIMAL(9, 3)); - |} - """.stripMargin, - followParquetFormatSpec = true) - - testSchema( - "DECIMAL(18, 3) - standard", - StructType(Seq(StructField("f1", DecimalType(18, 3)))), - """message root { - | optional int64 f1 (DECIMAL(18, 3)); - |} - """.stripMargin, - followParquetFormatSpec = true) - - testSchema( - "DECIMAL(19, 3) - standard", - StructType(Seq(StructField("f1", DecimalType(19, 3)))), - """message root { - | optional fixed_len_byte_array(9) f1 (DECIMAL(19, 3)); - |} - """.stripMargin, - followParquetFormatSpec = true) - - testSchema( - "DECIMAL(1, 0) - prior to 1.4.x", - StructType(Seq(StructField("f1", DecimalType(1, 0)))), - """message root { - | optional fixed_len_byte_array(1) f1 (DECIMAL(1, 0)); - |} - """.stripMargin) - - testSchema( - "DECIMAL(8, 3) - prior to 1.4.x", - StructType(Seq(StructField("f1", DecimalType(8, 3)))), - """message root { - | optional fixed_len_byte_array(4) f1 (DECIMAL(8, 3)); - |} - """.stripMargin) - - testSchema( - "DECIMAL(9, 3) - prior to 1.4.x", - StructType(Seq(StructField("f1", DecimalType(9, 3)))), - """message root { - | optional fixed_len_byte_array(5) f1 (DECIMAL(9, 3)); - |} - """.stripMargin) - - testSchema( - "DECIMAL(18, 3) - prior to 1.4.x", - StructType(Seq(StructField("f1", DecimalType(18, 3)))), - """message root { - | optional fixed_len_byte_array(8) f1 (DECIMAL(18, 3)); - |} - """.stripMargin) -} http://git-wip-us.apache.org/repos/asf/spark/blob/c1838e43/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetTest.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetTest.scala deleted file mode 100644 index 64e9405..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetTest.scala +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.parquet - -import java.io.File - -import scala.reflect.ClassTag -import scala.reflect.runtime.universe.TypeTag - -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.test.SQLTestUtils -import org.apache.spark.sql.{DataFrame, SaveMode} - -/** - * A helper trait that provides convenient facilities for Parquet testing. - * - * NOTE: Considering classes `Tuple1` ... `Tuple22` all extend `Product`, it would be more - * convenient to use tuples rather than special case classes when writing test cases/suites. - * Especially, `Tuple1.apply` can be used to easily wrap a single type/value. - */ -private[sql] trait ParquetTest extends SQLTestUtils { this: SparkFunSuite => - /** - * Writes `data` to a Parquet file, which is then passed to `f` and will be deleted after `f` - * returns. - */ - protected def withParquetFile[T <: Product: ClassTag: TypeTag] - (data: Seq[T]) - (f: String => Unit): Unit = { - withTempPath { file => - sqlContext.createDataFrame(data).write.parquet(file.getCanonicalPath) - f(file.getCanonicalPath) - } - } - - /** - * Writes `data` to a Parquet file and reads it back as a [[DataFrame]], - * which is then passed to `f`. The Parquet file will be deleted after `f` returns. - */ - protected def withParquetDataFrame[T <: Product: ClassTag: TypeTag] - (data: Seq[T]) - (f: DataFrame => Unit): Unit = { - withParquetFile(data)(path => f(sqlContext.read.parquet(path))) - } - - /** - * Writes `data` to a Parquet file, reads it back as a [[DataFrame]] and registers it as a - * temporary table named `tableName`, then call `f`. The temporary table together with the - * Parquet file will be dropped/deleted after `f` returns. - */ - protected def withParquetTable[T <: Product: ClassTag: TypeTag] - (data: Seq[T], tableName: String) - (f: => Unit): Unit = { - withParquetDataFrame(data) { df => - sqlContext.registerDataFrameAsTable(df, tableName) - withTempTable(tableName)(f) - } - } - - protected def makeParquetFile[T <: Product: ClassTag: TypeTag]( - data: Seq[T], path: File): Unit = { - sqlContext.createDataFrame(data).write.mode(SaveMode.Overwrite).parquet(path.getCanonicalPath) - } - - protected def makeParquetFile[T <: Product: ClassTag: TypeTag]( - df: DataFrame, path: File): Unit = { - df.write.mode(SaveMode.Overwrite).parquet(path.getCanonicalPath) - } - - protected def makePartitionDir( - basePath: File, - defaultPartitionName: String, - partitionCols: (String, Any)*): File = { - val partNames = partitionCols.map { case (k, v) => - val valueString = if (v == null || v == "") defaultPartitionName else v.toString - s"$k=$valueString" - } - - val partDir = partNames.foldLeft(basePath) { (parent, child) => - new File(parent, child) - } - - assert(partDir.mkdirs(), s"Couldn't create directory $partDir") - partDir - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/c1838e43/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetThriftCompatibilitySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetThriftCompatibilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetThriftCompatibilitySuite.scala deleted file mode 100644 index 1c532d7..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetThriftCompatibilitySuite.scala +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.parquet - -import org.apache.spark.sql.test.TestSQLContext -import org.apache.spark.sql.{Row, SQLContext} - -class ParquetThriftCompatibilitySuite extends ParquetCompatibilityTest { - import ParquetCompatibilityTest._ - - override val sqlContext: SQLContext = TestSQLContext - - private val parquetFilePath = - Thread.currentThread().getContextClassLoader.getResource("parquet-thrift-compat.snappy.parquet") - - test("Read Parquet file generated by parquet-thrift") { - logInfo( - s"""Schema of the Parquet file written by parquet-thrift: - |${readParquetSchema(parquetFilePath.toString)} - """.stripMargin) - - checkAnswer(sqlContext.read.parquet(parquetFilePath.toString), (0 until 10).map { i => - def nullable[T <: AnyRef]: ( => T) => T = makeNullable[T](i) - - val suits = Array("SPADES", "HEARTS", "DIAMONDS", "CLUBS") - - Row( - i % 2 == 0, - i.toByte, - (i + 1).toShort, - i + 2, - i.toLong * 10, - i.toDouble + 0.2d, - // Thrift `BINARY` values are actually unencoded `STRING` values, and thus are always - // treated as `BINARY (UTF8)` in parquet-thrift, since parquet-thrift always assume - // Thrift `STRING`s are encoded using UTF-8. - s"val_$i", - s"val_$i", - // Thrift ENUM values are converted to Parquet binaries containing UTF-8 strings - suits(i % 4), - - nullable(i % 2 == 0: java.lang.Boolean), - nullable(i.toByte: java.lang.Byte), - nullable((i + 1).toShort: java.lang.Short), - nullable(i + 2: Integer), - nullable((i * 10).toLong: java.lang.Long), - nullable(i.toDouble + 0.2d: java.lang.Double), - nullable(s"val_$i"), - nullable(s"val_$i"), - nullable(suits(i % 4)), - - Seq.tabulate(3)(n => s"arr_${i + n}"), - // Thrift `SET`s are converted to Parquet `LIST`s - Seq(i), - Seq.tabulate(3)(n => (i + n: Integer) -> s"val_${i + n}").toMap, - Seq.tabulate(3) { n => - (i + n) -> Seq.tabulate(3) { m => - Row(Seq.tabulate(3)(j => i + j + m), s"val_${i + m}") - } - }.toMap) - }) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/c1838e43/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala index 1907e64..562c279 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala @@ -51,7 +51,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll { sql( s""" |CREATE TEMPORARY TABLE jsonTable - |USING org.apache.spark.sql.json.DefaultSource + |USING json |OPTIONS ( | path '${path.toString}' |) AS @@ -75,7 +75,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll { sql( s""" |CREATE TEMPORARY TABLE jsonTable - |USING org.apache.spark.sql.json.DefaultSource + |USING json |OPTIONS ( | path '${path.toString}' |) AS @@ -92,7 +92,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll { sql( s""" |CREATE TEMPORARY TABLE jsonTable - |USING org.apache.spark.sql.json.DefaultSource + |USING json |OPTIONS ( | path '${path.toString}' |) AS @@ -107,7 +107,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll { sql( s""" |CREATE TEMPORARY TABLE IF NOT EXISTS jsonTable - |USING org.apache.spark.sql.json.DefaultSource + |USING json |OPTIONS ( | path '${path.toString}' |) AS @@ -122,7 +122,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll { sql( s""" |CREATE TEMPORARY TABLE jsonTable - |USING org.apache.spark.sql.json.DefaultSource + |USING json |OPTIONS ( | path '${path.toString}' |) AS @@ -139,7 +139,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll { sql( s""" |CREATE TEMPORARY TABLE jsonTable - |USING org.apache.spark.sql.json.DefaultSource + |USING json |OPTIONS ( | path '${path.toString}' |) AS @@ -158,7 +158,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll { sql( s""" |CREATE TEMPORARY TABLE IF NOT EXISTS jsonTable - |USING org.apache.spark.sql.json.DefaultSource + |USING json |OPTIONS ( | path '${path.toString}' |) AS @@ -175,7 +175,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll { sql( s""" |CREATE TEMPORARY TABLE jsonTable (a int, b string) - |USING org.apache.spark.sql.json.DefaultSource + |USING json |OPTIONS ( | path '${path.toString}' |) AS @@ -188,7 +188,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll { sql( s""" |CREATE TEMPORARY TABLE jsonTable - |USING org.apache.spark.sql.json.DefaultSource + |USING json |OPTIONS ( | path '${path.toString}' |) AS @@ -199,7 +199,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll { sql( s""" |CREATE TEMPORARY TABLE jsonTable - |USING org.apache.spark.sql.json.DefaultSource + |USING json |OPTIONS ( | path '${path.toString}' |) AS --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org