Repository: spark Updated Branches: refs/heads/master 6b2baec04 -> c025c3d0a
http://git-wip-us.apache.org/repos/asf/spark/blob/c025c3d0/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 82a8daf..f56fb96 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -22,13 +22,13 @@ import java.io.File import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql._ -import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD} import org.apache.spark.sql.execution.datasources.{InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD} import org.apache.spark.sql.hive.execution.HiveTableScan import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ -import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan} +import org.apache.spark.sql.parquet.ParquetRelation import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -57,7 +57,7 @@ case class ParquetDataWithKeyAndComplexTypes( * A suite to test the automatic conversion of metastore tables with parquet data to use the * built in parquet support. */ -class ParquetMetastoreSuiteBase extends ParquetPartitioningTest { +class ParquetMetastoreSuite extends ParquetPartitioningTest { override def beforeAll(): Unit = { super.beforeAll() @@ -134,6 +134,19 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest { LOCATION '${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}' """) + sql( + """ + |create table test_parquet + |( + | intField INT, + | stringField STRING + |) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + """.stripMargin) + (1 to 10).foreach { p => sql(s"ALTER TABLE partitioned_parquet ADD PARTITION (p=$p)") } @@ -166,6 +179,7 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest { sql("DROP TABLE normal_parquet") sql("DROP TABLE IF EXISTS jt") sql("DROP TABLE IF EXISTS jt_array") + sql("DROP TABLE IF EXISTS test_parquet") setConf(HiveContext.CONVERT_METASTORE_PARQUET, false) } @@ -176,40 +190,9 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest { }.isEmpty) assert( sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect { - case _: ParquetTableScan => true case _: PhysicalRDD => true }.nonEmpty) } -} - -class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { - val originalConf = conf.parquetUseDataSourceApi - - override def beforeAll(): Unit = { - super.beforeAll() - - sql( - """ - |create table test_parquet - |( - | intField INT, - | stringField STRING - |) - |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' - |STORED AS - | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' - | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' - """.stripMargin) - - conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, true) - } - - override def afterAll(): Unit = { - super.afterAll() - sql("DROP TABLE IF EXISTS test_parquet") - - setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf) - } test("scan an empty parquet table") { checkAnswer(sql("SELECT count(*) FROM test_parquet"), Row(0)) @@ -292,10 +275,10 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { ) table("test_parquet_ctas").queryExecution.optimizedPlan match { - case LogicalRelation(_: ParquetRelation2) => // OK + case LogicalRelation(_: ParquetRelation) => // OK case _ => fail( "test_parquet_ctas should be converted to " + - s"${classOf[ParquetRelation2].getCanonicalName}") + s"${classOf[ParquetRelation].getCanonicalName}") } sql("DROP TABLE IF EXISTS test_parquet_ctas") @@ -316,9 +299,9 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt") df.queryExecution.executedPlan match { - case ExecutedCommand(InsertIntoHadoopFsRelation(_: ParquetRelation2, _, _)) => // OK + case ExecutedCommand(InsertIntoHadoopFsRelation(_: ParquetRelation, _, _)) => // OK case o => fail("test_insert_parquet should be converted to a " + - s"${classOf[ParquetRelation2].getCanonicalName} and " + + s"${classOf[ParquetRelation].getCanonicalName} and " + s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan. " + s"However, found a ${o.toString} ") } @@ -346,9 +329,9 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array") df.queryExecution.executedPlan match { - case ExecutedCommand(InsertIntoHadoopFsRelation(r: ParquetRelation2, _, _)) => // OK + case ExecutedCommand(InsertIntoHadoopFsRelation(r: ParquetRelation, _, _)) => // OK case o => fail("test_insert_parquet should be converted to a " + - s"${classOf[ParquetRelation2].getCanonicalName} and " + + s"${classOf[ParquetRelation].getCanonicalName} and " + s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." + s"However, found a ${o.toString} ") } @@ -379,17 +362,17 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { assertResult(2) { analyzed.collect { - case r @ LogicalRelation(_: ParquetRelation2) => r + case r @ LogicalRelation(_: ParquetRelation) => r }.size } sql("DROP TABLE ms_convert") } - def collectParquetRelation(df: DataFrame): ParquetRelation2 = { + def collectParquetRelation(df: DataFrame): ParquetRelation = { val plan = df.queryExecution.analyzed plan.collectFirst { - case LogicalRelation(r: ParquetRelation2) => r + case LogicalRelation(r: ParquetRelation) => r }.getOrElse { fail(s"Expecting a ParquetRelation2, but got:\n$plan") } @@ -439,7 +422,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { // Converted test_parquet should be cached. catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) match { case null => fail("Converted test_parquet should be cached in the cache.") - case logical @ LogicalRelation(parquetRelation: ParquetRelation2) => // OK + case logical @ LogicalRelation(parquetRelation: ParquetRelation) => // OK case other => fail( "The cached test_parquet should be a Parquet Relation. " + @@ -543,81 +526,10 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { } } -class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase { - val originalConf = conf.parquetUseDataSourceApi - - override def beforeAll(): Unit = { - super.beforeAll() - conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, false) - } - - override def afterAll(): Unit = { - super.afterAll() - setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf) - } - - test("MetastoreRelation in InsertIntoTable will not be converted") { - sql( - """ - |create table test_insert_parquet - |( - | intField INT - |) - |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' - |STORED AS - | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' - | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' - """.stripMargin) - - val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt") - df.queryExecution.executedPlan match { - case insert: execution.InsertIntoHiveTable => // OK - case o => fail(s"The SparkPlan should be ${classOf[InsertIntoHiveTable].getCanonicalName}. " + - s"However, found ${o.toString}.") - } - - checkAnswer( - sql("SELECT intField FROM test_insert_parquet WHERE test_insert_parquet.intField > 5"), - sql("SELECT a FROM jt WHERE jt.a > 5").collect() - ) - - sql("DROP TABLE IF EXISTS test_insert_parquet") - } - - // TODO: enable it after the fix of SPARK-5950. - ignore("MetastoreRelation in InsertIntoHiveTable will not be converted") { - sql( - """ - |create table test_insert_parquet - |( - | int_array array<int> - |) - |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' - |STORED AS - | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' - | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' - """.stripMargin) - - val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array") - df.queryExecution.executedPlan match { - case insert: execution.InsertIntoHiveTable => // OK - case o => fail(s"The SparkPlan should be ${classOf[InsertIntoHiveTable].getCanonicalName}. " + - s"However, found ${o.toString}.") - } - - checkAnswer( - sql("SELECT int_array FROM test_insert_parquet"), - sql("SELECT a FROM jt_array").collect() - ) - - sql("DROP TABLE IF EXISTS test_insert_parquet") - } -} - /** * A suite of tests for the Parquet support through the data sources API. */ -class ParquetSourceSuiteBase extends ParquetPartitioningTest { +class ParquetSourceSuite extends ParquetPartitioningTest { override def beforeAll(): Unit = { super.beforeAll() @@ -712,20 +624,6 @@ class ParquetSourceSuiteBase extends ParquetPartitioningTest { } } } -} - -class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase { - val originalConf = conf.parquetUseDataSourceApi - - override def beforeAll(): Unit = { - super.beforeAll() - conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, true) - } - - override def afterAll(): Unit = { - super.afterAll() - setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf) - } test("values in arrays and maps stored in parquet are always nullable") { val df = createDataFrame(Tuple2(Map(2 -> 3), Seq(4, 5, 6)) :: Nil).toDF("m", "a") @@ -734,7 +632,7 @@ class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase { val expectedSchema1 = StructType( StructField("m", mapType1, nullable = true) :: - StructField("a", arrayType1, nullable = true) :: Nil) + StructField("a", arrayType1, nullable = true) :: Nil) assert(df.schema === expectedSchema1) df.write.format("parquet").saveAsTable("alwaysNullable") @@ -772,20 +670,6 @@ class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase { } } -class ParquetDataSourceOffSourceSuite extends ParquetSourceSuiteBase { - val originalConf = conf.parquetUseDataSourceApi - - override def beforeAll(): Unit = { - super.beforeAll() - conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, false) - } - - override def afterAll(): Unit = { - super.afterAll() - setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf) - } -} - /** * A collection of tests for parquet data with various forms of partitioning. */ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org