Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22467#discussion_r218844704 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSourceSuite.scala --- @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.io.File + +import org.apache.spark.sql.{Row, SaveMode} +import org.apache.spark.sql.catalyst.catalog.HiveTableRelation +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils + +/** + * A suite of tests for the Parquet support through the data sources API. + */ +class HiveParquetSourceSuite extends ParquetPartitioningTest { + import testImplicits._ + import spark._ + + override def beforeAll(): Unit = { + super.beforeAll() + dropTables("partitioned_parquet", + "partitioned_parquet_with_key", + "partitioned_parquet_with_complextypes", + "partitioned_parquet_with_key_and_complextypes", + "normal_parquet") + + sql( s""" + CREATE TEMPORARY VIEW partitioned_parquet + USING org.apache.spark.sql.parquet + OPTIONS ( + path '${partitionedTableDir.toURI}' + ) + """) + + sql( s""" + CREATE TEMPORARY VIEW partitioned_parquet_with_key + USING org.apache.spark.sql.parquet + OPTIONS ( + path '${partitionedTableDirWithKey.toURI}' + ) + """) + + sql( s""" + CREATE TEMPORARY VIEW normal_parquet + USING org.apache.spark.sql.parquet + OPTIONS ( + path '${new File(partitionedTableDir, "p=1").toURI}' + ) + """) + + sql( s""" + CREATE TEMPORARY VIEW partitioned_parquet_with_key_and_complextypes + USING org.apache.spark.sql.parquet + OPTIONS ( + path '${partitionedTableDirWithKeyAndComplexTypes.toURI}' + ) + """) + + sql( s""" + CREATE TEMPORARY VIEW partitioned_parquet_with_complextypes + USING org.apache.spark.sql.parquet + OPTIONS ( + path '${partitionedTableDirWithComplexTypes.toURI}' + ) + """) + } + + test("SPARK-6016 make sure to use the latest footers") { + sql("drop table if exists spark_6016_fix") + + // Create a DataFrame with two partitions. So, the created table will have two parquet files. + val df1 = (1 to 10).map(Tuple1(_)).toDF("a").coalesce(2) + df1.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("spark_6016_fix") + checkAnswer( + sql("select * from spark_6016_fix"), + (1 to 10).map(i => Row(i)) + ) + + // Create a DataFrame with four partitions. So, the created table will have four parquet files. + val df2 = (1 to 10).map(Tuple1(_)).toDF("b").coalesce(4) + df2.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("spark_6016_fix") + // For the bug of SPARK-6016, we are caching two outdated footers for df1. Then, + // since the new table has four parquet files, we are trying to read new footers from two files + // and then merge metadata in footers of these four (two outdated ones and two latest one), + // which will cause an error. + checkAnswer( + sql("select * from spark_6016_fix"), + (1 to 10).map(i => Row(i)) + ) + + sql("drop table spark_6016_fix") + } + + test("SPARK-8811: compatibility with array of struct in Hive") { + withTempPath { dir => + withTable("array_of_struct") { + val conf = Seq( + HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false", + SQLConf.PARQUET_BINARY_AS_STRING.key -> "true", + SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "false") + + withSQLConf(conf: _*) { + sql( + s"""CREATE TABLE array_of_struct + |STORED AS PARQUET LOCATION '${dir.toURI}' + |AS SELECT + | '1st' AS a, + | '2nd' AS b, + | ARRAY(NAMED_STRUCT('a', 'val_a', 'b', 'val_b')) AS c + """.stripMargin) + + checkAnswer( + spark.read.parquet(dir.getCanonicalPath), + Row("1st", "2nd", Seq(Row("val_a", "val_b")))) + } + } + } + } + + test("Verify the PARQUET conversion parameter: CONVERT_METASTORE_PARQUET") { + withTempView("single") { + val singleRowDF = Seq((0, "foo")).toDF("key", "value") + singleRowDF.createOrReplaceTempView("single") + + Seq("true", "false").foreach { parquetConversion => + withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> parquetConversion) { + val tableName = "test_parquet_ctas" + withTable(tableName) { + sql( + s""" + |CREATE TABLE $tableName STORED AS PARQUET + |AS SELECT tmp.key, tmp.value FROM single tmp + """.stripMargin) + + val df = spark.sql(s"SELECT * FROM $tableName WHERE key=0") + checkAnswer(df, singleRowDF) + + val queryExecution = df.queryExecution + if (parquetConversion == "true") { + queryExecution.analyzed.collectFirst { + case _: LogicalRelation => + }.getOrElse { + fail(s"Expecting the query plan to convert parquet to data sources, " + + s"but got:\n$queryExecution") + } + } else { + queryExecution.analyzed.collectFirst { + case _: HiveTableRelation => + }.getOrElse { + fail(s"Expecting no conversion from parquet to data sources, " + + s"but got:\n$queryExecution") + } + } + } + } + } + } + } + + test("values in arrays and maps stored in parquet are always nullable") { + val df = createDataFrame(Tuple2(Map(2 -> 3), Seq(4, 5, 6)) :: Nil).toDF("m", "a") + val mapType1 = MapType(IntegerType, IntegerType, valueContainsNull = false) + val arrayType1 = ArrayType(IntegerType, containsNull = false) + val expectedSchema1 = + StructType( + StructField("m", mapType1, nullable = true) :: + StructField("a", arrayType1, nullable = true) :: Nil) + assert(df.schema === expectedSchema1) + + withTable("alwaysNullable") { + df.write.format("parquet").saveAsTable("alwaysNullable") + + val mapType2 = MapType(IntegerType, IntegerType, valueContainsNull = true) + val arrayType2 = ArrayType(IntegerType, containsNull = true) + val expectedSchema2 = + StructType( + StructField("m", mapType2, nullable = true) :: + StructField("a", arrayType2, nullable = true) :: Nil) + + assert(table("alwaysNullable").schema === expectedSchema2) + + checkAnswer( + sql("SELECT m, a FROM alwaysNullable"), + Row(Map(2 -> 3), Seq(4, 5, 6))) + } + } + + test("Aggregation attribute names can't contain special chars \" ,;{}()\\n\\t=\"") { + val tempDir = Utils.createTempDir() --- End diff -- It seems that the tempDir never delete? Its also the remaining problem in old code, maybe with temp dir is better.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org