This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new afd9e2cc0a7 Revert [SPARK-39203][SQL] Rewrite table location to absolute URI based on database URI afd9e2cc0a7 is described below commit afd9e2cc0a73069514eef5c5eb7a3ebed8e4b8cf Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Fri Apr 21 10:28:22 2023 +0900 Revert [SPARK-39203][SQL] Rewrite table location to absolute URI based on database URI ### What changes were proposed in this pull request? This reverts https://github.com/apache/spark/pull/36625 and its followup https://github.com/apache/spark/pull/38321 . ### Why are the changes needed? External table location can be arbitrary and has no connection with the database location. It can be wrong to qualify the external table location based on the database location. If a table written by old Spark versions does not have a qualified location, there is no way to restore it as the information is already lost. People can manually fix the table locations assuming they are under the same HDFS cluster with the database location, by themselves. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? N/A Closes #40871 from cloud-fan/minor. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../spark/sql/hive/HiveExternalCatalog.scala | 27 ++-------------- .../spark/sql/hive/client/HiveClientImpl.scala | 36 +++++----------------- .../spark/sql/hive/HiveExternalCatalogSuite.scala | 15 --------- 3 files changed, 10 insertions(+), 68 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 5cd3b9c3abf..794d94a4f70 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.hive import java.io.IOException import java.lang.reflect.InvocationTargetException -import java.net.URI import java.util import java.util.Locale @@ -852,15 +851,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // source tables. Here we set the table location to `locationUri` field and filter out the // path option in storage properties, to avoid exposing this concept externally. val storageWithLocation = { - val tableLocation = getLocationFromStorageProps(table).map { path => - // Before SPARK-19257, created data source table does not use absolute uri. - // This makes Spark can't read these tables across HDFS clusters. - // Rewrite table path to absolute uri based on location uri (The location uri has been - // rewritten by HiveClientImpl.convertHiveTableToCatalogTable) to fix this issue. - toAbsoluteURI(CatalogUtils.stringToURI(path), table.storage.locationUri) - } + val tableLocation = getLocationFromStorageProps(table) // We pass None as `newPath` here, to remove the path option in storage properties. - updateLocationInStorageProps(table, newPath = None).copy(locationUri = tableLocation) + updateLocationInStorageProps(table, newPath = None).copy( + locationUri = tableLocation.map(CatalogUtils.stringToURI(_))) } val storageWithoutHiveGeneratedProperties = storageWithLocation.copy(properties = storageWithLocation.properties.filterKeys(!HIVE_GENERATED_STORAGE_PROPERTIES(_)).toMap) @@ -1447,19 +1441,4 @@ object HiveExternalCatalog { isHiveCompatibleDataType(m.keyType) && isHiveCompatibleDataType(m.valueType) case _ => true } - - /** Rewrite uri to absolute location. For example: - * uri: /user/hive/warehouse/test_table - * absoluteUri: viewfs://clusterA/user/hive/warehouse/ - * The result is: viewfs://clusterA/user/hive/warehouse/test_table - */ - private[spark] def toAbsoluteURI(uri: URI, absoluteUri: Option[URI]): URI = { - if (!uri.isAbsolute && absoluteUri.isDefined) { - val aUri = absoluteUri.get - new URI(aUri.getScheme, aUri.getUserInfo, aUri.getHost, aUri.getPort, - uri.getPath, uri.getQuery, uri.getFragment) - } else { - uri - } - } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 92c3ca0ab3e..becca8eae5e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.hive.client import java.io.PrintStream import java.lang.{Iterable => JIterable} import java.lang.reflect.InvocationTargetException -import java.net.URI import java.nio.charset.StandardCharsets.UTF_8 import java.util.{HashMap => JHashMap, Locale, Map => JMap} import java.util.concurrent.TimeUnit._ @@ -54,7 +53,6 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, NoSuchDatabaseException, NoSuchPartitionException, NoSuchPartitionsException, NoSuchTableException, PartitionsAlreadyExistException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.catalog.CatalogUtils.stringToURI import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} import org.apache.spark.sql.catalyst.util.CharVarcharUtils @@ -539,21 +537,7 @@ private[hive] class HiveClientImpl( createTime = h.getTTable.getCreateTime.toLong * 1000, lastAccessTime = h.getLastAccessTime.toLong * 1000, storage = CatalogStorageFormat( - locationUri = shim.getDataLocation(h).map { loc => - val tableUri = stringToURI(loc) - if (h.getTableType == HiveTableType.VIRTUAL_VIEW) { - // Data location of SQL view is useless. Do not qualify it even if it's present, as - // it can be an invalid path. - tableUri - } else { - // Before SPARK-19257, created data source table does not use absolute uri. - // This makes Spark can't read these tables across HDFS clusters. - // Rewrite table location to absolute uri based on database uri to fix this issue. - val absoluteUri = Option(tableUri).filterNot(_.isAbsolute) - .map(_ => stringToURI(client.getDatabase(h.getDbName).getLocationUri)) - HiveExternalCatalog.toAbsoluteURI(tableUri, absoluteUri) - } - }, + locationUri = shim.getDataLocation(h).map(CatalogUtils.stringToURI), // To avoid ClassNotFound exception, we try our best to not get the format class, but get // the class name directly. However, for non-native tables, there is no interface to get // the format class name, so we may still throw ClassNotFound in this case. @@ -793,8 +777,7 @@ private[hive] class HiveClientImpl( spec: TablePartitionSpec): Option[CatalogTablePartition] = withHiveState { val hiveTable = rawHiveTable.rawTable.asInstanceOf[HiveTable] val hivePartition = shim.getPartition(client, hiveTable, spec.asJava, false) - Option(hivePartition) - .map(fromHivePartition(_, rawHiveTable.toCatalogTable.storage.locationUri)) + Option(hivePartition).map(fromHivePartition) } override def getPartitions( @@ -816,10 +799,7 @@ private[hive] class HiveClientImpl( assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid") s } - val absoluteUri = shim.getDataLocation(hiveTable).map(stringToURI).filterNot(_.isAbsolute) - .map(_ => stringToURI(client.getDatabase(hiveTable.getDbName).getLocationUri)) - val parts = shim.getPartitions(client, hiveTable, partSpec.asJava) - .map(fromHivePartition(_, absoluteUri)) + val parts = shim.getPartitions(client, hiveTable, partSpec.asJava).map(fromHivePartition) HiveCatalogMetrics.incrementFetchedPartitions(parts.length) parts } @@ -829,9 +809,8 @@ private[hive] class HiveClientImpl( predicates: Seq[Expression]): Seq[CatalogTablePartition] = withHiveState { val hiveTable = rawHiveTable.rawTable.asInstanceOf[HiveTable] hiveTable.setOwner(userName) - val parts = - shim.getPartitionsByFilter(client, hiveTable, predicates, rawHiveTable.toCatalogTable) - .map(fromHivePartition(_, rawHiveTable.toCatalogTable.storage.locationUri)) + val parts = shim.getPartitionsByFilter( + client, hiveTable, predicates, rawHiveTable.toCatalogTable).map(fromHivePartition) HiveCatalogMetrics.incrementFetchedPartitions(parts.length) parts } @@ -1212,7 +1191,7 @@ private[hive] object HiveClientImpl extends Logging { /** * Build the native partition metadata from Hive's Partition. */ - def fromHivePartition(hp: HivePartition, absoluteUri: Option[URI]): CatalogTablePartition = { + def fromHivePartition(hp: HivePartition): CatalogTablePartition = { val apiPartition = hp.getTPartition val properties: Map[String, String] = if (hp.getParameters != null) { hp.getParameters.asScala.toMap @@ -1222,8 +1201,7 @@ private[hive] object HiveClientImpl extends Logging { CatalogTablePartition( spec = Option(hp.getSpec).map(_.asScala.toMap).getOrElse(Map.empty), storage = CatalogStorageFormat( - locationUri = Option(HiveExternalCatalog.toAbsoluteURI( - stringToURI(apiPartition.getSd.getLocation), absoluteUri)), + locationUri = Option(CatalogUtils.stringToURI(apiPartition.getSd.getLocation)), inputFormat = Option(apiPartition.getSd.getInputFormat), outputFormat = Option(apiPartition.getSd.getOutputFormat), serde = Option(apiPartition.getSd.getSerdeInfo.getSerializationLib), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala index 7c36198c326..e413e0ee73c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala @@ -200,19 +200,4 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite { assert(alteredTable.provider === Some("foo")) }) } - - test("SPARK-39203: Rewrite table location to absolute location based on database location") { - val tableLocation1 = CatalogUtils.stringToURI("/user/hive/warehouse/t1") - val tableLocation2 = CatalogUtils.stringToURI("viewfs://clusterB/user/hive/warehouse/t2") - val dbLocation = CatalogUtils.stringToURI("viewfs://clusterA/user/hive/warehouse/") - - assert(HiveExternalCatalog.toAbsoluteURI(tableLocation1, Some(dbLocation)) - .equals(CatalogUtils.stringToURI("viewfs://clusterA/user/hive/warehouse/t1"))) - - assert(HiveExternalCatalog.toAbsoluteURI(tableLocation1, None) - .equals(tableLocation1)) - - assert(HiveExternalCatalog.toAbsoluteURI(tableLocation2, Some(dbLocation)) - .equals(tableLocation2)) - } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org