This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new aeff69b [SPARK-24360][SQL] Support Hive 3.1 metastore aeff69b is described below commit aeff69bd879661367367f39b5dfecd9a76223c0b Author: Dongjoon Hyun <dongj...@apache.org> AuthorDate: Wed Jan 30 20:33:21 2019 -0800 [SPARK-24360][SQL] Support Hive 3.1 metastore ## What changes were proposed in this pull request? Hive 3.1.1 is released. This PR aims to support Hive 3.1.x metastore. Please note that Hive 3.0.0 Metastore is skipped intentionally. ## How was this patch tested? Pass the Jenkins with the updated test cases including 3.1. Closes #23694 from dongjoon-hyun/SPARK-24360-3.1. Authored-by: Dongjoon Hyun <dongj...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- docs/sql-data-sources-hive-tables.md | 2 +- .../org/apache/spark/sql/hive/HiveUtils.scala | 3 +- .../spark/sql/hive/client/HiveClientImpl.scala | 17 ++- .../apache/spark/sql/hive/client/HiveShim.scala | 126 +++++++++++++++++++++ .../sql/hive/client/IsolatedClientLoader.scala | 1 + .../org/apache/spark/sql/hive/client/package.scala | 9 +- .../spark/sql/hive/execution/SaveAsHiveFile.scala | 2 +- .../spark/sql/hive/client/HiveClientSuite.scala | 23 +++- .../spark/sql/hive/client/HiveClientVersions.scala | 2 +- .../spark/sql/hive/client/HiveVersionSuite.scala | 7 +- .../spark/sql/hive/client/VersionsSuite.scala | 33 +++++- 11 files changed, 206 insertions(+), 19 deletions(-) diff --git a/docs/sql-data-sources-hive-tables.md b/docs/sql-data-sources-hive-tables.md index 3b39a32..14773ca 100644 --- a/docs/sql-data-sources-hive-tables.md +++ b/docs/sql-data-sources-hive-tables.md @@ -115,7 +115,7 @@ The following options can be used to configure the version of Hive that is used <td><code>1.2.1</code></td> <td> Version of the Hive metastore. Available - options are <code>0.12.0</code> through <code>2.3.4</code>. + options are <code>0.12.0</code> through <code>2.3.4</code> and <code>3.1.0</code> through <code>3.1.1</code>. </td> </tr> <tr> diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 597eef1..38bbe64 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -62,7 +62,8 @@ private[spark] object HiveUtils extends Logging { val HIVE_METASTORE_VERSION = buildConf("spark.sql.hive.metastore.version") .doc("Version of the Hive metastore. Available options are " + - s"<code>0.12.0</code> through <code>2.3.4</code>.") + "<code>0.12.0</code> through <code>2.3.4</code> and " + + "<code>3.1.0</code> through <code>3.1.1</code>.") .stringConf .createWithDefault(builtinHiveVersion) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 5e9b324..bfe19c2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -105,6 +105,7 @@ private[hive] class HiveClientImpl( case hive.v2_1 => new Shim_v2_1() case hive.v2_2 => new Shim_v2_2() case hive.v2_3 => new Shim_v2_3() + case hive.v3_1 => new Shim_v3_1() } // Create an internal session state for this HiveClientImpl. @@ -852,11 +853,17 @@ private[hive] class HiveClientImpl( client.getAllTables("default").asScala.foreach { t => logDebug(s"Deleting table $t") val table = client.getTable("default", t) - client.getIndexes("default", t, 255).asScala.foreach { index => - shim.dropIndex(client, "default", t, index.getIndexName) - } - if (!table.isIndexTable) { - client.dropTable("default", t) + try { + client.getIndexes("default", t, 255).asScala.foreach { index => + shim.dropIndex(client, "default", t, index.getIndexName) + } + if (!table.isIndexTable) { + client.dropTable("default", t) + } + } catch { + case _: NoSuchMethodError => + // HIVE-18448 Hive 3.0 remove index APIs + client.dropTable("default", t) } } client.getAllDatabases.asScala.filterNot(_ == "default").foreach { db => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 4d48490..a8ebb23 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -40,6 +40,7 @@ import org.apache.hadoop.hive.serde.serdeConstants import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTablePartition, CatalogUtils, FunctionResource, FunctionResourceType} @@ -1179,3 +1180,128 @@ private[client] class Shim_v2_1 extends Shim_v2_0 { private[client] class Shim_v2_2 extends Shim_v2_1 private[client] class Shim_v2_3 extends Shim_v2_1 + +private[client] class Shim_v3_1 extends Shim_v2_3 { + // Spark supports only non-ACID operations + protected lazy val isAcidIUDoperation = JBoolean.FALSE + + // Writer ID can be 0 for non-ACID operations + protected lazy val writeIdInLoadTableOrPartition: JLong = 0L + + // Statement ID + protected lazy val stmtIdInLoadTableOrPartition: JInteger = 0 + + protected lazy val listBucketingLevel: JInteger = 0 + + private lazy val clazzLoadFileType = getClass.getClassLoader.loadClass( + "org.apache.hadoop.hive.ql.plan.LoadTableDesc$LoadFileType") + + private lazy val loadPartitionMethod = + findMethod( + classOf[Hive], + "loadPartition", + classOf[Path], + classOf[Table], + classOf[JMap[String, String]], + clazzLoadFileType, + JBoolean.TYPE, + JBoolean.TYPE, + JBoolean.TYPE, + JBoolean.TYPE, + JBoolean.TYPE, + classOf[JLong], + JInteger.TYPE, + JBoolean.TYPE) + private lazy val loadTableMethod = + findMethod( + classOf[Hive], + "loadTable", + classOf[Path], + classOf[String], + clazzLoadFileType, + JBoolean.TYPE, + JBoolean.TYPE, + JBoolean.TYPE, + JBoolean.TYPE, + classOf[JLong], + JInteger.TYPE, + JBoolean.TYPE) + private lazy val loadDynamicPartitionsMethod = + findMethod( + classOf[Hive], + "loadDynamicPartitions", + classOf[Path], + classOf[String], + classOf[JMap[String, String]], + clazzLoadFileType, + JInteger.TYPE, + JInteger.TYPE, + JBoolean.TYPE, + JLong.TYPE, + JInteger.TYPE, + JBoolean.TYPE, + classOf[AcidUtils.Operation], + JBoolean.TYPE) + + override def loadPartition( + hive: Hive, + loadPath: Path, + tableName: String, + partSpec: JMap[String, String], + replace: Boolean, + inheritTableSpecs: Boolean, + isSkewedStoreAsSubdir: Boolean, + isSrcLocal: Boolean): Unit = { + val session = SparkSession.getActiveSession + assert(session.nonEmpty) + val database = session.get.sessionState.catalog.getCurrentDatabase + val table = hive.getTable(database, tableName) + val loadFileType = if (replace) { + clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("REPLACE_ALL")) + } else { + clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("KEEP_EXISTING")) + } + assert(loadFileType.isDefined) + loadPartitionMethod.invoke(hive, loadPath, table, partSpec, loadFileType.get, + inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean, + isSrcLocal: JBoolean, isAcid, hasFollowingStatsTask, + writeIdInLoadTableOrPartition, stmtIdInLoadTableOrPartition, replace: JBoolean) + } + + override def loadTable( + hive: Hive, + loadPath: Path, + tableName: String, + replace: Boolean, + isSrcLocal: Boolean): Unit = { + val loadFileType = if (replace) { + clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("REPLACE_ALL")) + } else { + clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("KEEP_EXISTING")) + } + assert(loadFileType.isDefined) + loadTableMethod.invoke(hive, loadPath, tableName, loadFileType.get, isSrcLocal: JBoolean, + isSkewedStoreAsSubdir, isAcidIUDoperation, hasFollowingStatsTask, + writeIdInLoadTableOrPartition, stmtIdInLoadTableOrPartition: JInteger, replace: JBoolean) + } + + override def loadDynamicPartitions( + hive: Hive, + loadPath: Path, + tableName: String, + partSpec: JMap[String, String], + replace: Boolean, + numDP: Int, + listBucketingEnabled: Boolean): Unit = { + val loadFileType = if (replace) { + clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("REPLACE_ALL")) + } else { + clazzLoadFileType.getEnumConstants.find(_.toString.equalsIgnoreCase("KEEP_EXISTING")) + } + assert(loadFileType.isDefined) + loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, loadFileType.get, + numDP: JInteger, listBucketingLevel, isAcid, writeIdInLoadTableOrPartition, + stmtIdInLoadTableOrPartition, hasFollowingStatsTask, AcidUtils.Operation.NOT_ACID, + replace: JBoolean) + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index ca98c30..1f7ab9b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -100,6 +100,7 @@ private[hive] object IsolatedClientLoader extends Logging { case "2.1" | "2.1.0" | "2.1.1" => hive.v2_1 case "2.2" | "2.2.0" => hive.v2_2 case "2.3" | "2.3.0" | "2.3.1" | "2.3.2" | "2.3.3" | "2.3.4" => hive.v2_3 + case "3.1" | "3.1.0" | "3.1.1" => hive.v3_1 case version => throw new UnsupportedOperationException(s"Unsupported Hive Metastore version ($version). " + s"Please set ${HiveUtils.HIVE_METASTORE_VERSION.key} with a valid version.") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala index e4cf729..b6a4949 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala @@ -79,7 +79,14 @@ package object client { exclusions = Seq("org.apache.curator:*", "org.pentaho:pentaho-aggdesigner-algorithm")) - val allSupportedHiveVersions = Set(v12, v13, v14, v1_0, v1_1, v1_2, v2_0, v2_1, v2_2, v2_3) + // Since Hive 3.0, HookUtils uses org.apache.logging.log4j.util.Strings + case object v3_1 extends HiveVersion("3.1.1", + extraDeps = Seq("org.apache.logging.log4j:log4j-api:2.10.0", + "org.apache.derby:derby:10.14.1.0"), + exclusions = Seq("org.apache.curator:*", + "org.pentaho:pentaho-aggdesigner-algorithm")) + + val allSupportedHiveVersions = Set(v12, v13, v14, v1_0, v1_1, v1_2, v2_0, v2_1, v2_2, v2_3, v3_1) } // scalastyle:on diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala index 078968e..4ddba50 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala @@ -114,7 +114,7 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { // be removed by Hive when Hive is trying to empty the table directory. val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0) val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] = - Set(v1_1, v1_2, v2_0, v2_1, v2_2, v2_3) + Set(v1_1, v1_2, v2_0, v2_1, v2_2, v2_3, v3_1) // Ensure all the supported versions are considered here. assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath == diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala index 7a325bf..f3d8c2a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala @@ -19,12 +19,16 @@ package org.apache.spark.sql.hive.client import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe +import org.apache.hadoop.mapred.TextInputFormat import org.scalatest.BeforeAndAfterAll +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{BooleanType, IntegerType, LongType} +import org.apache.spark.sql.types.{BooleanType, IntegerType, LongType, StructType} // TODO: Refactor this to `HivePartitionFilteringSuite` class HiveClientSuite(version: String) @@ -46,7 +50,22 @@ class HiveClientSuite(version: String) val hadoopConf = new Configuration() hadoopConf.setBoolean(tryDirectSqlKey, tryDirectSql) val client = buildClient(hadoopConf) - client.runSqlHive("CREATE TABLE test (value INT) PARTITIONED BY (ds INT, h INT, chunk STRING)") + val tableSchema = + new StructType().add("value", "int").add("ds", "int").add("h", "int").add("chunk", "string") + val table = CatalogTable( + identifier = TableIdentifier("test", Some("default")), + tableType = CatalogTableType.MANAGED, + schema = tableSchema, + partitionColumnNames = Seq("ds", "h", "chunk"), + storage = CatalogStorageFormat( + locationUri = None, + inputFormat = Some(classOf[TextInputFormat].getName), + outputFormat = Some(classOf[HiveIgnoreKeyTextOutputFormat[_, _]].getName), + serde = Some(classOf[LazySimpleSerDe].getName()), + compressed = false, + properties = Map.empty + )) + client.createTable(table, ignoreIfExists = false) val partitions = for { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala index 30592a3..9b9af79 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala @@ -23,5 +23,5 @@ import org.apache.spark.SparkFunSuite private[client] trait HiveClientVersions { protected val versions = - IndexedSeq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3") + IndexedSeq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3", "3.1") } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala index e5963d0..a45ad1f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala @@ -34,10 +34,15 @@ private[client] abstract class HiveVersionSuite(version: String) extends SparkFu // Hive changed the default of datanucleus.schema.autoCreateAll from true to false and // hive.metastore.schema.verification from false to true since 2.0 // For details, see the JIRA HIVE-6113 and HIVE-12463 - if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3") { + if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3" || + version == "3.1") { hadoopConf.set("datanucleus.schema.autoCreateAll", "true") hadoopConf.set("hive.metastore.schema.verification", "false") } + // Since Hive 3.0, HIVE-19310 skipped `ensureDbInit` if `hive.in.test=false`. + if (version == "3.1") { + hadoopConf.set("hive.in.test", "true") + } HiveClientBuilder.buildClient( version, hadoopConf, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 218bd18..b323871 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -103,7 +103,7 @@ class VersionsSuite extends SparkFunSuite with Logging { } private val versions = - Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3") + Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3", "3.1") private var client: HiveClient = null @@ -118,10 +118,15 @@ class VersionsSuite extends SparkFunSuite with Logging { // Hive changed the default of datanucleus.schema.autoCreateAll from true to false and // hive.metastore.schema.verification from false to true since 2.0 // For details, see the JIRA HIVE-6113 and HIVE-12463 - if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3") { + if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3" || + version == "3.1") { hadoopConf.set("datanucleus.schema.autoCreateAll", "true") hadoopConf.set("hive.metastore.schema.verification", "false") } + // Since Hive 3.0, HIVE-19310 skipped `ensureDbInit` if `hive.in.test=false`. + if (version == "3.1") { + hadoopConf.set("hive.in.test", "true") + } client = buildClient(version, hadoopConf, HiveUtils.formatTimeVarsForHiveClient(hadoopConf)) if (versionSpark != null) versionSpark.reset() versionSpark = TestHiveVersion(client) @@ -318,7 +323,20 @@ class VersionsSuite extends SparkFunSuite with Logging { properties = Map.empty) test(s"$version: sql create partitioned table") { - client.runSqlHive("CREATE TABLE src_part (value INT) PARTITIONED BY (key1 INT, key2 INT)") + val table = CatalogTable( + identifier = TableIdentifier("src_part", Some("default")), + tableType = CatalogTableType.MANAGED, + schema = new StructType().add("value", "int").add("key1", "int").add("key2", "int"), + partitionColumnNames = Seq("key1", "key2"), + storage = CatalogStorageFormat( + locationUri = None, + inputFormat = Some(classOf[TextInputFormat].getName), + outputFormat = Some(classOf[HiveIgnoreKeyTextOutputFormat[_, _]].getName), + serde = Some(classOf[LazySimpleSerDe].getName()), + compressed = false, + properties = Map.empty + )) + client.createTable(table, ignoreIfExists = false) } val testPartitionCount = 2 @@ -556,9 +574,12 @@ class VersionsSuite extends SparkFunSuite with Logging { } test(s"$version: sql create index and reset") { - client.runSqlHive("CREATE TABLE indexed_table (key INT)") - client.runSqlHive("CREATE INDEX index_1 ON TABLE indexed_table(key) " + - "as 'COMPACT' WITH DEFERRED REBUILD") + // HIVE-18448 Since Hive 3.0, INDEX is not supported. + if (version != "3.1") { + client.runSqlHive("CREATE TABLE indexed_table (key INT)") + client.runSqlHive("CREATE INDEX index_1 ON TABLE indexed_table(key) " + + "as 'COMPACT' WITH DEFERRED REBUILD") + } } /////////////////////////////////////////////////////////////////////////// --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org