Repository: spark
Updated Branches:
  refs/heads/branch-2.2 b9adddb6a -> 3c30be53b


[SPARK-22158][SQL][BRANCH-2.2] convertMetastore should not ignore table property

## What changes were proposed in this pull request?

>From the beginning, **convertMetastoreOrc** ignores table properties and use 
>an empty map instead. This PR fixes that. **convertMetastoreParquet** also 
>ignore.

```scala
val options = Map[String, String]()
```

- [SPARK-14070: 
HiveMetastoreCatalog.scala](https://github.com/apache/spark/pull/11891/files#diff-ee66e11b56c21364760a5ed2b783f863R650)
- [Master branch: 
HiveStrategies.scala](https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala#L197
)

## How was this patch tested?

Pass the Jenkins with an updated test suite.

Author: Dongjoon Hyun <dongj...@apache.org>

Closes #19417 from dongjoon-hyun/SPARK-22158-BRANCH-2.2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3c30be53
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3c30be53
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3c30be53

Branch: refs/heads/branch-2.2
Commit: 3c30be53bae5754929c675b50ea932e164d2ff9f
Parents: b9adddb
Author: Dongjoon Hyun <dongj...@apache.org>
Authored: Tue Oct 3 11:42:55 2017 -0700
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Tue Oct 3 11:42:55 2017 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/hive/HiveStrategies.scala  |  4 +-
 .../spark/sql/hive/execution/HiveDDLSuite.scala | 54 +++++++++++++++++---
 2 files changed, 50 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3c30be53/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 53e500e..85a88df 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -193,12 +193,12 @@ case class RelationConversions(
   private def convert(relation: HiveTableRelation): LogicalRelation = {
     val serde = 
relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
     if (serde.contains("parquet")) {
-      val options = Map(ParquetOptions.MERGE_SCHEMA ->
+      val options = relation.tableMeta.storage.properties + 
(ParquetOptions.MERGE_SCHEMA ->
         
conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString)
       sessionCatalog.metastoreCatalog
         .convertToLogicalRelation(relation, options, 
classOf[ParquetFileFormat], "parquet")
     } else {
-      val options = Map[String, String]()
+      val options = relation.tableMeta.storage.properties
       sessionCatalog.metastoreCatalog
         .convertToLogicalRelation(relation, options, classOf[OrcFileFormat], 
"orc")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/3c30be53/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 6b19b5c..c1c8281 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -21,6 +21,8 @@ import java.io.File
 import java.net.URI
 
 import org.apache.hadoop.fs.Path
+import org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER
+import org.apache.parquet.hadoop.ParquetFileReader
 import org.scalatest.BeforeAndAfterEach
 
 import org.apache.spark.SparkException
@@ -30,6 +32,7 @@ import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils}
 import org.apache.spark.sql.hive.HiveExternalCatalog
+import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, 
CONVERT_METASTORE_PARQUET}
 import org.apache.spark.sql.hive.orc.OrcFileOperator
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
@@ -1438,12 +1441,8 @@ class HiveDDLSuite
         sql("INSERT INTO t SELECT 1")
         checkAnswer(spark.table("t"), Row(1))
         // Check if this is compressed as ZLIB.
-        val maybeOrcFile = path.listFiles().find(!_.getName.endsWith(".crc"))
-        assert(maybeOrcFile.isDefined)
-        val orcFilePath = maybeOrcFile.get.toPath.toString
-        val expectedCompressionKind =
-          OrcFileOperator.getFileReader(orcFilePath).get.getCompression
-        assert("ZLIB" === expectedCompressionKind.name())
+        val maybeOrcFile = path.listFiles().find(_.getName.startsWith("part"))
+        assertCompression(maybeOrcFile, "orc", "ZLIB")
 
         sql("CREATE TABLE t2 USING HIVE AS SELECT 1 AS c1, 'a' AS c2")
         val table2 = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier("t2"))
@@ -1941,4 +1940,47 @@ class HiveDDLSuite
       }
     }
   }
+
+  private def assertCompression(maybeFile: Option[File], format: String, 
compression: String) = {
+    assert(maybeFile.isDefined)
+
+    val actualCompression = format match {
+      case "orc" =>
+        
OrcFileOperator.getFileReader(maybeFile.get.toPath.toString).get.getCompression.name
+
+      case "parquet" =>
+        val footer = ParquetFileReader.readFooter(
+          sparkContext.hadoopConfiguration, new Path(maybeFile.get.getPath), 
NO_FILTER)
+        footer.getBlocks.get(0).getColumns.get(0).getCodec.toString
+    }
+
+    assert(compression === actualCompression)
+  }
+
+  Seq(("orc", "ZLIB"), ("parquet", "GZIP")).foreach { case (fileFormat, 
compression) =>
+    test(s"SPARK-22158 convertMetastore should not ignore table property - 
$fileFormat") {
+      withSQLConf(CONVERT_METASTORE_ORC.key -> "true", 
CONVERT_METASTORE_PARQUET.key -> "true") {
+        withTable("t") {
+          withTempPath { path =>
+            sql(
+              s"""
+                |CREATE TABLE t(id int) USING hive
+                |OPTIONS(fileFormat '$fileFormat', compression '$compression')
+                |LOCATION '${path.toURI}'
+              """.stripMargin)
+            val table = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+            assert(DDLUtils.isHiveTable(table))
+            assert(table.storage.serde.get.contains(fileFormat))
+            assert(table.storage.properties.get("compression") == 
Some(compression))
+            assert(spark.table("t").collect().isEmpty)
+
+            sql("INSERT INTO t SELECT 1")
+            checkAnswer(spark.table("t"), Row(1))
+            val maybeFile = path.listFiles().find(_.getName.startsWith("part"))
+            assertCompression(maybeFile, fileFormat, compression)
+          }
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to