This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new afd9e2cc0a7 Revert [SPARK-39203][SQL] Rewrite table location to 
absolute URI based on database URI
afd9e2cc0a7 is described below

commit afd9e2cc0a73069514eef5c5eb7a3ebed8e4b8cf
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Fri Apr 21 10:28:22 2023 +0900

    Revert [SPARK-39203][SQL] Rewrite table location to absolute URI based on 
database URI
    
    ### What changes were proposed in this pull request?
    
    This reverts https://github.com/apache/spark/pull/36625 and its followup 
https://github.com/apache/spark/pull/38321 .
    
    ### Why are the changes needed?
    
    External table location can be arbitrary and has no connection with the 
database location. It can be wrong to qualify the external table location based 
on the database location.
    
    If a table written by old Spark versions does not have a qualified 
location, there is no way to restore it as the information is already lost. 
People can manually fix the table locations assuming they are under the same 
HDFS cluster with the database location, by themselves.
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    N/A
    
    Closes #40871 from cloud-fan/minor.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../spark/sql/hive/HiveExternalCatalog.scala       | 27 ++--------------
 .../spark/sql/hive/client/HiveClientImpl.scala     | 36 +++++-----------------
 .../spark/sql/hive/HiveExternalCatalogSuite.scala  | 15 ---------
 3 files changed, 10 insertions(+), 68 deletions(-)

diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 5cd3b9c3abf..794d94a4f70 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.hive
 
 import java.io.IOException
 import java.lang.reflect.InvocationTargetException
-import java.net.URI
 import java.util
 import java.util.Locale
 
@@ -852,15 +851,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
     // source tables. Here we set the table location to `locationUri` field 
and filter out the
     // path option in storage properties, to avoid exposing this concept 
externally.
     val storageWithLocation = {
-      val tableLocation = getLocationFromStorageProps(table).map { path =>
-        // Before SPARK-19257, created data source table does not use absolute 
uri.
-        // This makes Spark can't read these tables across HDFS clusters.
-        // Rewrite table path to absolute uri based on location uri (The 
location uri has been
-        // rewritten by HiveClientImpl.convertHiveTableToCatalogTable) to fix 
this issue.
-        toAbsoluteURI(CatalogUtils.stringToURI(path), 
table.storage.locationUri)
-      }
+      val tableLocation = getLocationFromStorageProps(table)
       // We pass None as `newPath` here, to remove the path option in storage 
properties.
-      updateLocationInStorageProps(table, newPath = None).copy(locationUri = 
tableLocation)
+      updateLocationInStorageProps(table, newPath = None).copy(
+        locationUri = tableLocation.map(CatalogUtils.stringToURI(_)))
     }
     val storageWithoutHiveGeneratedProperties = 
storageWithLocation.copy(properties =
       
storageWithLocation.properties.filterKeys(!HIVE_GENERATED_STORAGE_PROPERTIES(_)).toMap)
@@ -1447,19 +1441,4 @@ object HiveExternalCatalog {
       isHiveCompatibleDataType(m.keyType) && 
isHiveCompatibleDataType(m.valueType)
     case _ => true
   }
-
-  /** Rewrite uri to absolute location. For example:
-   *    uri: /user/hive/warehouse/test_table
-   *    absoluteUri: viewfs://clusterA/user/hive/warehouse/
-   *    The result is: viewfs://clusterA/user/hive/warehouse/test_table
-   */
-  private[spark] def toAbsoluteURI(uri: URI, absoluteUri: Option[URI]): URI = {
-    if (!uri.isAbsolute && absoluteUri.isDefined) {
-      val aUri = absoluteUri.get
-      new URI(aUri.getScheme, aUri.getUserInfo, aUri.getHost, aUri.getPort,
-        uri.getPath, uri.getQuery, uri.getFragment)
-    } else {
-      uri
-    }
-  }
 }
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 92c3ca0ab3e..becca8eae5e 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.hive.client
 import java.io.PrintStream
 import java.lang.{Iterable => JIterable}
 import java.lang.reflect.InvocationTargetException
-import java.net.URI
 import java.nio.charset.StandardCharsets.UTF_8
 import java.util.{HashMap => JHashMap, Locale, Map => JMap}
 import java.util.concurrent.TimeUnit._
@@ -54,7 +53,6 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, 
NoSuchDatabaseException, NoSuchPartitionException, NoSuchPartitionsException, 
NoSuchTableException, PartitionsAlreadyExistException}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.catalog.CatalogUtils.stringToURI
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
@@ -539,21 +537,7 @@ private[hive] class HiveClientImpl(
       createTime = h.getTTable.getCreateTime.toLong * 1000,
       lastAccessTime = h.getLastAccessTime.toLong * 1000,
       storage = CatalogStorageFormat(
-        locationUri = shim.getDataLocation(h).map { loc =>
-          val tableUri = stringToURI(loc)
-          if (h.getTableType == HiveTableType.VIRTUAL_VIEW) {
-            // Data location of SQL view is useless. Do not qualify it even if 
it's present, as
-            // it can be an invalid path.
-            tableUri
-          } else {
-            // Before SPARK-19257, created data source table does not use 
absolute uri.
-            // This makes Spark can't read these tables across HDFS clusters.
-            // Rewrite table location to absolute uri based on database uri to 
fix this issue.
-            val absoluteUri = Option(tableUri).filterNot(_.isAbsolute)
-              .map(_ => 
stringToURI(client.getDatabase(h.getDbName).getLocationUri))
-            HiveExternalCatalog.toAbsoluteURI(tableUri, absoluteUri)
-          }
-        },
+        locationUri = shim.getDataLocation(h).map(CatalogUtils.stringToURI),
         // To avoid ClassNotFound exception, we try our best to not get the 
format class, but get
         // the class name directly. However, for non-native tables, there is 
no interface to get
         // the format class name, so we may still throw ClassNotFound in this 
case.
@@ -793,8 +777,7 @@ private[hive] class HiveClientImpl(
       spec: TablePartitionSpec): Option[CatalogTablePartition] = withHiveState 
{
     val hiveTable = rawHiveTable.rawTable.asInstanceOf[HiveTable]
     val hivePartition = shim.getPartition(client, hiveTable, spec.asJava, 
false)
-    Option(hivePartition)
-      .map(fromHivePartition(_, 
rawHiveTable.toCatalogTable.storage.locationUri))
+    Option(hivePartition).map(fromHivePartition)
   }
 
   override def getPartitions(
@@ -816,10 +799,7 @@ private[hive] class HiveClientImpl(
         assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid")
         s
     }
-    val absoluteUri = 
shim.getDataLocation(hiveTable).map(stringToURI).filterNot(_.isAbsolute)
-      .map(_ => 
stringToURI(client.getDatabase(hiveTable.getDbName).getLocationUri))
-    val parts = shim.getPartitions(client, hiveTable, partSpec.asJava)
-      .map(fromHivePartition(_, absoluteUri))
+    val parts = shim.getPartitions(client, hiveTable, 
partSpec.asJava).map(fromHivePartition)
     HiveCatalogMetrics.incrementFetchedPartitions(parts.length)
     parts
   }
@@ -829,9 +809,8 @@ private[hive] class HiveClientImpl(
       predicates: Seq[Expression]): Seq[CatalogTablePartition] = withHiveState 
{
     val hiveTable = rawHiveTable.rawTable.asInstanceOf[HiveTable]
     hiveTable.setOwner(userName)
-    val parts =
-      shim.getPartitionsByFilter(client, hiveTable, predicates, 
rawHiveTable.toCatalogTable)
-        .map(fromHivePartition(_, 
rawHiveTable.toCatalogTable.storage.locationUri))
+    val parts = shim.getPartitionsByFilter(
+      client, hiveTable, predicates, 
rawHiveTable.toCatalogTable).map(fromHivePartition)
     HiveCatalogMetrics.incrementFetchedPartitions(parts.length)
     parts
   }
@@ -1212,7 +1191,7 @@ private[hive] object HiveClientImpl extends Logging {
   /**
    * Build the native partition metadata from Hive's Partition.
    */
-  def fromHivePartition(hp: HivePartition, absoluteUri: Option[URI]): 
CatalogTablePartition = {
+  def fromHivePartition(hp: HivePartition): CatalogTablePartition = {
     val apiPartition = hp.getTPartition
     val properties: Map[String, String] = if (hp.getParameters != null) {
       hp.getParameters.asScala.toMap
@@ -1222,8 +1201,7 @@ private[hive] object HiveClientImpl extends Logging {
     CatalogTablePartition(
       spec = Option(hp.getSpec).map(_.asScala.toMap).getOrElse(Map.empty),
       storage = CatalogStorageFormat(
-        locationUri = Option(HiveExternalCatalog.toAbsoluteURI(
-          stringToURI(apiPartition.getSd.getLocation), absoluteUri)),
+        locationUri = 
Option(CatalogUtils.stringToURI(apiPartition.getSd.getLocation)),
         inputFormat = Option(apiPartition.getSd.getInputFormat),
         outputFormat = Option(apiPartition.getSd.getOutputFormat),
         serde = Option(apiPartition.getSd.getSerdeInfo.getSerializationLib),
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
index 7c36198c326..e413e0ee73c 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
@@ -200,19 +200,4 @@ class HiveExternalCatalogSuite extends 
ExternalCatalogSuite {
       assert(alteredTable.provider === Some("foo"))
     })
   }
-
-  test("SPARK-39203: Rewrite table location to absolute location based on 
database location") {
-    val tableLocation1 = CatalogUtils.stringToURI("/user/hive/warehouse/t1")
-    val tableLocation2 = 
CatalogUtils.stringToURI("viewfs://clusterB/user/hive/warehouse/t2")
-    val dbLocation = 
CatalogUtils.stringToURI("viewfs://clusterA/user/hive/warehouse/")
-
-    assert(HiveExternalCatalog.toAbsoluteURI(tableLocation1, Some(dbLocation))
-      
.equals(CatalogUtils.stringToURI("viewfs://clusterA/user/hive/warehouse/t1")))
-
-    assert(HiveExternalCatalog.toAbsoluteURI(tableLocation1, None)
-      .equals(tableLocation1))
-
-    assert(HiveExternalCatalog.toAbsoluteURI(tableLocation2, Some(dbLocation))
-      .equals(tableLocation2))
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to