Repository: spark Updated Branches: refs/heads/master e23370ec6 -> d1c1fbc34
[SPARK-15715][SQL] Fix alter partition with storage information in Hive ## What changes were proposed in this pull request? This command didn't work for Hive tables. Now it does: ``` ALTER TABLE boxes PARTITION (width=3) SET SERDE 'com.sparkbricks.serde.ColumnarSerDe' WITH SERDEPROPERTIES ('compress'='true') ``` ## How was this patch tested? `HiveExternalCatalogSuite` Author: Andrew Or <and...@databricks.com> Closes #13453 from andrewor14/alter-partition-storage. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d1c1fbc3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d1c1fbc3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d1c1fbc3 Branch: refs/heads/master Commit: d1c1fbc345a704a2c8210960683f33f945660d5a Parents: e23370e Author: Andrew Or <and...@databricks.com> Authored: Thu Jun 2 17:44:48 2016 -0700 Committer: Andrew Or <and...@databricks.com> Committed: Thu Jun 2 17:44:48 2016 -0700 ---------------------------------------------------------------------- .../catalyst/catalog/ExternalCatalogSuite.scala | 10 +++++++ .../spark/sql/hive/client/HiveClientImpl.scala | 30 ++++++++++++++------ .../spark/sql/hive/client/VersionsSuite.scala | 5 +++- .../spark/sql/hive/execution/HiveDDLSuite.scala | 22 ++++++++++++++ 4 files changed, 57 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d1c1fbc3/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index 377e64b..0c4d363 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -382,6 +382,8 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac // See HIVE-2742 for more detail. catalog.setCurrentDatabase("db2") val newLocation = newUriForDatabase() + val newSerde = "com.sparkbricks.text.EasySerde" + val newSerdeProps = Map("spark" -> "bricks", "compressed" -> "false") // alter but keep spec the same val oldPart1 = catalog.getPartition("db2", "tbl2", part1.spec) val oldPart2 = catalog.getPartition("db2", "tbl2", part2.spec) @@ -394,6 +396,14 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac assert(newPart2.storage.locationUri == Some(newLocation)) assert(oldPart1.storage.locationUri != Some(newLocation)) assert(oldPart2.storage.locationUri != Some(newLocation)) + // alter other storage information + catalog.alterPartitions("db2", "tbl2", Seq( + oldPart1.copy(storage = storageFormat.copy(serde = Some(newSerde))), + oldPart2.copy(storage = storageFormat.copy(serdeProperties = newSerdeProps)))) + val newPart1b = catalog.getPartition("db2", "tbl2", part1.spec) + val newPart2b = catalog.getPartition("db2", "tbl2", part2.spec) + assert(newPart1b.storage.serde == Some(newSerde)) + assert(newPart2b.storage.serdeProperties == newSerdeProps) // alter but change spec, should fail because new partition specs do not exist yet val badPart1 = part1.copy(spec = Map("a" -> "v1", "b" -> "v2")) val badPart2 = part2.copy(spec = Map("a" -> "v3", "b" -> "v4")) http://git-wip-us.apache.org/repos/asf/spark/blob/d1c1fbc3/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 47fa418..1c89d8c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.{TableType => HiveTableType} import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema} +import org.apache.hadoop.hive.metastore.api.{SerDeInfo, StorageDescriptor} import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition, Table => HiveTable} import org.apache.hadoop.hive.ql.processors._ @@ -779,18 +780,29 @@ private[hive] class HiveClientImpl( hiveTable } - private def toHiveViewTable(view: CatalogTable): HiveTable = { - val tbl = toHiveTable(view) - tbl.setTableType(HiveTableType.VIRTUAL_VIEW) - tbl.setSerializationLib(null) - tbl.clearSerDeInfo() - tbl - } - private def toHivePartition( p: CatalogTablePartition, ht: HiveTable): HivePartition = { - new HivePartition(ht, p.spec.asJava, p.storage.locationUri.map { l => new Path(l) }.orNull) + val tpart = new org.apache.hadoop.hive.metastore.api.Partition + val partValues = ht.getPartCols.asScala.map { hc => + p.spec.get(hc.getName).getOrElse { + throw new IllegalArgumentException( + s"Partition spec is missing a value for column '${hc.getName}': ${p.spec}") + } + } + val storageDesc = new StorageDescriptor + val serdeInfo = new SerDeInfo + p.storage.locationUri.foreach(storageDesc.setLocation) + p.storage.inputFormat.foreach(storageDesc.setInputFormat) + p.storage.outputFormat.foreach(storageDesc.setOutputFormat) + p.storage.serde.foreach(serdeInfo.setSerializationLib) + serdeInfo.setParameters(p.storage.serdeProperties.asJava) + storageDesc.setSerdeInfo(serdeInfo) + tpart.setDbName(ht.getDbName) + tpart.setTableName(ht.getTableName) + tpart.setValues(partValues.asJava) + tpart.setSd(storageDesc) + new HivePartition(ht, tpart) } private def fromHivePartition(hp: HivePartition): CatalogTablePartition = { http://git-wip-us.apache.org/repos/asf/spark/blob/d1c1fbc3/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 8ae4535..5b209ac 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -354,7 +354,10 @@ class VersionsSuite extends SparkFunSuite with Logging { test(s"$version: alterPartitions") { val spec = Map("key1" -> "1", "key2" -> "2") val newLocation = Utils.createTempDir().getPath() - val storage = storageFormat.copy(locationUri = Some(newLocation)) + val storage = storageFormat.copy( + locationUri = Some(newLocation), + // needed for 0.12 alter partitions + serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) val partition = CatalogTablePartition(spec, storage) client.alterPartitions("default", "src_part", Seq(partition)) assert(client.getPartition("default", "src_part", spec) http://git-wip-us.apache.org/repos/asf/spark/blob/d1c1fbc3/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index a98d469..b2f01fc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -345,6 +345,28 @@ class HiveDDLSuite } } + test("alter table partition - storage information") { + sql("CREATE TABLE boxes (height INT, length INT) PARTITIONED BY (width INT)") + sql("INSERT OVERWRITE TABLE boxes PARTITION (width=4) SELECT 4, 4") + val catalog = spark.sessionState.catalog + val expectedSerde = "com.sparkbricks.serde.ColumnarSerDe" + val expectedSerdeProps = Map("compress" -> "true") + val expectedSerdePropsString = + expectedSerdeProps.map { case (k, v) => s"'$k'='$v'" }.mkString(", ") + val oldPart = catalog.getPartition(TableIdentifier("boxes"), Map("width" -> "4")) + assume(oldPart.storage.serde != Some(expectedSerde), "bad test: serde was already set") + assume(oldPart.storage.serdeProperties.filterKeys(expectedSerdeProps.contains) != + expectedSerdeProps, "bad test: serde properties were already set") + sql(s"""ALTER TABLE boxes PARTITION (width=4) + | SET SERDE '$expectedSerde' + | WITH SERDEPROPERTIES ($expectedSerdePropsString) + |""".stripMargin) + val newPart = catalog.getPartition(TableIdentifier("boxes"), Map("width" -> "4")) + assert(newPart.storage.serde == Some(expectedSerde)) + assume(newPart.storage.serdeProperties.filterKeys(expectedSerdeProps.contains) == + expectedSerdeProps) + } + test("drop table using drop view") { withTable("tab1") { sql("CREATE TABLE tab1(c1 int)") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org