Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20208#discussion_r162781286
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaEvolutionTest.scala
 ---
    @@ -0,0 +1,436 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.datasources
    +
    +import java.io.File
    +
    +import org.apache.spark.sql.{QueryTest, Row}
    +import org.apache.spark.sql.functions._
    +import org.apache.spark.sql.internal.SQLConf
    +import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
    +
    +/**
    + * Schema can evolve in several ways and the followings are supported in 
file-based data sources.
    + *
    + *   1. Add a column
    + *   2. Remove a column
    + *   3. Change a column position
    + *   4. Change a column type
    + *
    + * Here, we consider safe evolution without data loss. For example, data 
type evolution should be
    + * from small types to larger types like `int`-to-`long`, not vice versa.
    + *
    + * So far, file-based data sources have schema evolution coverages like 
the followings.
    + *
    + *   | File Format  | Coverage     | Note                                  
                 |
    + *   | ------------ | ------------ | 
------------------------------------------------------ |
    + *   | TEXT         | N/A          | Schema consists of a single string 
column.             |
    + *   | CSV          | 1, 2, 4      |                                       
                 |
    + *   | JSON         | 1, 2, 3, 4   |                                       
                 |
    + *   | ORC          | 1, 2, 3, 4   | Native vectorized ORC reader has the 
widest coverage.  |
    + *   | PARQUET      | 1, 2, 3      |                                       
                 |
    + *
    + * This aims to provide an explicit test coverage for schema evolution on 
file-based data sources.
    + * Since a file format has its own coverage of schema evolution, we need a 
test suite
    + * for each file-based data source with corresponding supported test case 
traits.
    + *
    + * The following is a hierarchy of test traits.
    + *
    + *   SchemaEvolutionTest
    + *     -> AddColumnEvolutionTest
    + *     -> RemoveColumnEvolutionTest
    + *     -> ChangePositionEvolutionTest
    + *     -> BooleanTypeEvolutionTest
    + *     -> IntegralTypeEvolutionTest
    + *     -> ToDoubleTypeEvolutionTest
    + *     -> ToDecimalTypeEvolutionTest
    + */
    +
    +trait SchemaEvolutionTest extends QueryTest with SQLTestUtils with 
SharedSQLContext {
    +  val format: String
    +  val options: Map[String, String] = Map.empty[String, String]
    +}
    +
    +/**
    + * Add column.
    + * This test suite assumes that the missing column should be `null`.
    + */
    +trait AddColumnEvolutionTest extends SchemaEvolutionTest {
    +  import testImplicits._
    +
    +  test("append column at the end") {
    +    withTempDir { dir =>
    +      val path = dir.getCanonicalPath
    +
    +      val df1 = Seq("a", "b").toDF("col1")
    +      val df2 = df1.withColumn("col2", lit("x"))
    +      val df3 = df2.withColumn("col3", lit("y"))
    +
    +      val dir1 = s"$path${File.separator}part=one"
    +      val dir2 = s"$path${File.separator}part=two"
    +      val dir3 = s"$path${File.separator}part=three"
    +
    +      
df1.write.mode("overwrite").format(format).options(options).save(dir1)
    +      
df2.write.mode("overwrite").format(format).options(options).save(dir2)
    +      
df3.write.mode("overwrite").format(format).options(options).save(dir3)
    +
    +      val df = spark.read
    +        .schema(df3.schema)
    +        .format(format)
    +        .options(options)
    +        .load(path)
    +
    +      checkAnswer(df, Seq(
    +        Row("a", null, null, "one"),
    +        Row("b", null, null, "one"),
    +        Row("a", "x", null, "two"),
    +        Row("b", "x", null, "two"),
    +        Row("a", "x", "y", "three"),
    +        Row("b", "x", "y", "three")))
    +    }
    +  }
    +}
    +
    +/**
    + * Remove column.
    + * This test suite is identical with AddColumnEvolutionTest,
    + * but this test suite ensures that the schema and result are truncated to 
the given schema.
    + */
    +trait RemoveColumnEvolutionTest extends SchemaEvolutionTest {
    +  import testImplicits._
    +
    +  test("remove column at the end") {
    +    withTempDir { dir =>
    +      val path = dir.getCanonicalPath
    +
    +      val df1 = Seq(("1", "a"), ("2", "b")).toDF("col1", "col2")
    +      val df2 = df1.withColumn("col3", lit("y"))
    +
    +      val dir1 = s"$path${File.separator}part=two"
    +      val dir2 = s"$path${File.separator}part=three"
    +
    +      
df1.write.mode("overwrite").format(format).options(options).save(dir1)
    +      
df2.write.mode("overwrite").format(format).options(options).save(dir2)
    +
    +      val df = spark.read
    +        .schema(df1.schema)
    +        .format(format)
    +        .options(options)
    +        .load(path)
    +
    +      checkAnswer(df, Seq(
    +        Row("1", "a", "two"),
    +        Row("2", "b", "two"),
    +        Row("1", "a", "three"),
    +        Row("2", "b", "three")))
    +    }
    +  }
    +}
    +
    +/**
    + * Change column positions.
    + * This suite assumes that all data set have the same number of columns.
    + */
    +trait ChangePositionEvolutionTest extends SchemaEvolutionTest {
    +  import testImplicits._
    +
    +  test("change column position") {
    +    withTempDir { dir =>
    +      // val path = dir.getCanonicalPath
    +      val path = "/tmp/change"
    +
    +      val df1 = Seq(("1", "a"), ("2", "b"), ("3", "c")).toDF("col1", 
"col2")
    +      val df2 = Seq(("d", "4"), ("e", "5"), ("f", "6")).toDF("col2", 
"col1")
    +      val unionDF = df1.unionByName(df2)
    +
    +      val dir1 = s"$path${File.separator}part=one"
    +      val dir2 = s"$path${File.separator}part=two"
    +
    +      
df1.write.mode("overwrite").format(format).options(options).save(dir1)
    +      
df2.write.mode("overwrite").format(format).options(options).save(dir2)
    +
    +      val df = spark.read
    +        .schema(unionDF.schema)
    +        .format(format)
    +        .options(options)
    +        .load(path)
    +        .select("col1", "col2")
    +
    +      checkAnswer(df, unionDF)
    +    }
    +  }
    +}
    +
    +trait BooleanTypeEvolutionTest extends SchemaEvolutionTest {
    +  import testImplicits._
    +
    +  test("boolean to byte/short/int/long") {
    +    withTempDir { dir =>
    +      val path = dir.getCanonicalPath
    +
    +      val values = (1 to 10).map(_ % 2)
    +      val booleanDF = (1 to 10).map(_ % 2 == 1).toDF("col1")
    +      val byteDF = values.map(_.toByte).toDF("col1")
    +      val shortDF = values.map(_.toShort).toDF("col1")
    +      val intDF = values.toDF("col1")
    +      val longDF = values.map(_.toLong).toDF("col1")
    +
    +      
booleanDF.write.mode("overwrite").format(format).options(options).save(path)
    +
    +      val df1 = spark.read
    +        .schema("col1 byte")
    +        .format(format)
    +        .options(options)
    +        .load(path)
    +      checkAnswer(df1, byteDF)
    +
    +      val df2 = spark.read
    +        .schema("col1 short")
    +        .format(format)
    +        .options(options)
    +        .load(path)
    +      checkAnswer(df2, shortDF)
    +
    +      val df3 = spark.read
    +        .schema("col1 int")
    +        .format(format)
    +        .options(options)
    +        .load(path)
    +      checkAnswer(df3, intDF)
    +
    +      val df4 = spark.read
    +        .schema("col1 long")
    +        .format(format)
    +        .options(options)
    +        .load(path)
    +      checkAnswer(df4, longDF)
    +    }
    +  }
    +}
    +
    +trait IntegralTypeEvolutionTest extends SchemaEvolutionTest {
    +
    +  import testImplicits._
    +
    +  test("change column type from `byte` to `short/int/long`") {
    --- End diff --
    
    nit: `` `byte` `` -> `byte` or the opposite for consistency with the same 
instances. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to