Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22467#discussion_r218844704
  
    --- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSourceSuite.scala 
---
    @@ -0,0 +1,220 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.hive
    +
    +import java.io.File
    +
    +import org.apache.spark.sql.{Row, SaveMode}
    +import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
    +import org.apache.spark.sql.execution.datasources.LogicalRelation
    +import org.apache.spark.sql.internal.SQLConf
    +import org.apache.spark.sql.types._
    +import org.apache.spark.util.Utils
    +
    +/**
    + * A suite of tests for the Parquet support through the data sources API.
    + */
    +class HiveParquetSourceSuite extends ParquetPartitioningTest {
    +  import testImplicits._
    +  import spark._
    +
    +  override def beforeAll(): Unit = {
    +    super.beforeAll()
    +    dropTables("partitioned_parquet",
    +      "partitioned_parquet_with_key",
    +      "partitioned_parquet_with_complextypes",
    +      "partitioned_parquet_with_key_and_complextypes",
    +      "normal_parquet")
    +
    +    sql( s"""
    +      CREATE TEMPORARY VIEW partitioned_parquet
    +      USING org.apache.spark.sql.parquet
    +      OPTIONS (
    +        path '${partitionedTableDir.toURI}'
    +      )
    +    """)
    +
    +    sql( s"""
    +      CREATE TEMPORARY VIEW partitioned_parquet_with_key
    +      USING org.apache.spark.sql.parquet
    +      OPTIONS (
    +        path '${partitionedTableDirWithKey.toURI}'
    +      )
    +    """)
    +
    +    sql( s"""
    +      CREATE TEMPORARY VIEW normal_parquet
    +      USING org.apache.spark.sql.parquet
    +      OPTIONS (
    +        path '${new File(partitionedTableDir, "p=1").toURI}'
    +      )
    +    """)
    +
    +    sql( s"""
    +      CREATE TEMPORARY VIEW partitioned_parquet_with_key_and_complextypes
    +      USING org.apache.spark.sql.parquet
    +      OPTIONS (
    +        path '${partitionedTableDirWithKeyAndComplexTypes.toURI}'
    +      )
    +    """)
    +
    +    sql( s"""
    +      CREATE TEMPORARY VIEW partitioned_parquet_with_complextypes
    +      USING org.apache.spark.sql.parquet
    +      OPTIONS (
    +        path '${partitionedTableDirWithComplexTypes.toURI}'
    +      )
    +    """)
    +  }
    +
    +  test("SPARK-6016 make sure to use the latest footers") {
    +    sql("drop table if exists spark_6016_fix")
    +
    +    // Create a DataFrame with two partitions. So, the created table will 
have two parquet files.
    +    val df1 = (1 to 10).map(Tuple1(_)).toDF("a").coalesce(2)
    +    
df1.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("spark_6016_fix")
    +    checkAnswer(
    +      sql("select * from spark_6016_fix"),
    +      (1 to 10).map(i => Row(i))
    +    )
    +
    +    // Create a DataFrame with four partitions. So, the created table will 
have four parquet files.
    +    val df2 = (1 to 10).map(Tuple1(_)).toDF("b").coalesce(4)
    +    
df2.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("spark_6016_fix")
    +    // For the bug of SPARK-6016, we are caching two outdated footers for 
df1. Then,
    +    // since the new table has four parquet files, we are trying to read 
new footers from two files
    +    // and then merge metadata in footers of these four (two outdated ones 
and two latest one),
    +    // which will cause an error.
    +    checkAnswer(
    +      sql("select * from spark_6016_fix"),
    +      (1 to 10).map(i => Row(i))
    +    )
    +
    +    sql("drop table spark_6016_fix")
    +  }
    +
    +  test("SPARK-8811: compatibility with array of struct in Hive") {
    +    withTempPath { dir =>
    +      withTable("array_of_struct") {
    +        val conf = Seq(
    +          HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false",
    +          SQLConf.PARQUET_BINARY_AS_STRING.key -> "true",
    +          SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "false")
    +
    +        withSQLConf(conf: _*) {
    +          sql(
    +            s"""CREATE TABLE array_of_struct
    +               |STORED AS PARQUET LOCATION '${dir.toURI}'
    +               |AS SELECT
    +               |  '1st' AS a,
    +               |  '2nd' AS b,
    +               |  ARRAY(NAMED_STRUCT('a', 'val_a', 'b', 'val_b')) AS c
    +             """.stripMargin)
    +
    +          checkAnswer(
    +            spark.read.parquet(dir.getCanonicalPath),
    +            Row("1st", "2nd", Seq(Row("val_a", "val_b"))))
    +        }
    +      }
    +    }
    +  }
    +
    +  test("Verify the PARQUET conversion parameter: 
CONVERT_METASTORE_PARQUET") {
    +    withTempView("single") {
    +      val singleRowDF = Seq((0, "foo")).toDF("key", "value")
    +      singleRowDF.createOrReplaceTempView("single")
    +
    +      Seq("true", "false").foreach { parquetConversion =>
    +        withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> 
parquetConversion) {
    +          val tableName = "test_parquet_ctas"
    +          withTable(tableName) {
    +            sql(
    +              s"""
    +                 |CREATE TABLE $tableName STORED AS PARQUET
    +                 |AS SELECT tmp.key, tmp.value FROM single tmp
    +               """.stripMargin)
    +
    +            val df = spark.sql(s"SELECT * FROM $tableName WHERE key=0")
    +            checkAnswer(df, singleRowDF)
    +
    +            val queryExecution = df.queryExecution
    +            if (parquetConversion == "true") {
    +              queryExecution.analyzed.collectFirst {
    +                case _: LogicalRelation =>
    +              }.getOrElse {
    +                fail(s"Expecting the query plan to convert parquet to data 
sources, " +
    +                  s"but got:\n$queryExecution")
    +              }
    +            } else {
    +              queryExecution.analyzed.collectFirst {
    +                case _: HiveTableRelation =>
    +              }.getOrElse {
    +                fail(s"Expecting no conversion from parquet to data 
sources, " +
    +                  s"but got:\n$queryExecution")
    +              }
    +            }
    +          }
    +        }
    +      }
    +    }
    +  }
    +
    +  test("values in arrays and maps stored in parquet are always nullable") {
    +    val df = createDataFrame(Tuple2(Map(2 -> 3), Seq(4, 5, 6)) :: 
Nil).toDF("m", "a")
    +    val mapType1 = MapType(IntegerType, IntegerType, valueContainsNull = 
false)
    +    val arrayType1 = ArrayType(IntegerType, containsNull = false)
    +    val expectedSchema1 =
    +      StructType(
    +        StructField("m", mapType1, nullable = true) ::
    +          StructField("a", arrayType1, nullable = true) :: Nil)
    +    assert(df.schema === expectedSchema1)
    +
    +    withTable("alwaysNullable") {
    +      df.write.format("parquet").saveAsTable("alwaysNullable")
    +
    +      val mapType2 = MapType(IntegerType, IntegerType, valueContainsNull = 
true)
    +      val arrayType2 = ArrayType(IntegerType, containsNull = true)
    +      val expectedSchema2 =
    +        StructType(
    +          StructField("m", mapType2, nullable = true) ::
    +            StructField("a", arrayType2, nullable = true) :: Nil)
    +
    +      assert(table("alwaysNullable").schema === expectedSchema2)
    +
    +      checkAnswer(
    +        sql("SELECT m, a FROM alwaysNullable"),
    +        Row(Map(2 -> 3), Seq(4, 5, 6)))
    +    }
    +  }
    +
    +  test("Aggregation attribute names can't contain special chars \" 
,;{}()\\n\\t=\"") {
    +    val tempDir = Utils.createTempDir()
    --- End diff --
    
    It seems that the tempDir never delete? Its also the remaining problem in 
old code, maybe with temp dir is better.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to