Repository: spark
Updated Branches:
  refs/heads/master 1dd925644 -> e4ce1bc4f


[SPARK-15659][SQL] Ensure FileSystem is gotten from path

## What changes were proposed in this pull request?

Currently `spark.sql.warehouse.dir` is pointed to local dir by default, which 
will throw exception when HADOOP_CONF_DIR is configured and default FS is hdfs.

```
java.lang.IllegalArgumentException: Wrong FS: 
file:/Users/sshao/projects/apache-spark/spark-warehouse, expected: 
hdfs://localhost:8020
```

So we should always get the `FileSystem` from `Path` to avoid wrong FS problem.

## How was this patch tested?

Local test.

Author: jerryshao <ss...@hortonworks.com>

Closes #13405 from jerryshao/SPARK-15659.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e4ce1bc4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e4ce1bc4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e4ce1bc4

Branch: refs/heads/master
Commit: e4ce1bc4f3ca088365ff199e563f23a552dc88ef
Parents: 1dd9256
Author: jerryshao <ss...@hortonworks.com>
Authored: Wed Jun 1 08:28:19 2016 -0500
Committer: Sean Owen <so...@cloudera.com>
Committed: Wed Jun 1 08:28:19 2016 -0500

----------------------------------------------------------------------
 .../sql/catalyst/catalog/InMemoryCatalog.scala    | 18 +++++++++++++-----
 1 file changed, 13 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e4ce1bc4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 489a1c8..6052579 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -22,7 +22,7 @@ import java.io.IOException
 import scala.collection.mutable
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.Path
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.AnalysisException
@@ -105,8 +105,6 @@ class InMemoryCatalog(hadoopConfig: Configuration = new 
Configuration) extends E
     }
   }
 
-  private val fs = FileSystem.get(hadoopConfig)
-
   // --------------------------------------------------------------------------
   // Databases
   // --------------------------------------------------------------------------
@@ -120,7 +118,9 @@ class InMemoryCatalog(hadoopConfig: Configuration = new 
Configuration) extends E
       }
     } else {
       try {
-        fs.mkdirs(new Path(dbDefinition.locationUri))
+        val location = new Path(dbDefinition.locationUri)
+        val fs = location.getFileSystem(hadoopConfig)
+        fs.mkdirs(location)
       } catch {
         case e: IOException =>
           throw new SparkException(s"Unable to create database 
${dbDefinition.name} as failed " +
@@ -147,7 +147,9 @@ class InMemoryCatalog(hadoopConfig: Configuration = new 
Configuration) extends E
       // Remove the database.
       val dbDefinition = catalog(db).db
       try {
-        fs.delete(new Path(dbDefinition.locationUri), true)
+        val location = new Path(dbDefinition.locationUri)
+        val fs = location.getFileSystem(hadoopConfig)
+        fs.delete(location, true)
       } catch {
         case e: IOException =>
           throw new SparkException(s"Unable to drop database 
${dbDefinition.name} as failed " +
@@ -203,6 +205,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new 
Configuration) extends E
       if (tableDefinition.tableType == CatalogTableType.MANAGED) {
         val dir = new Path(catalog(db).db.locationUri, table)
         try {
+          val fs = dir.getFileSystem(hadoopConfig)
           fs.mkdirs(dir)
         } catch {
           case e: IOException =>
@@ -223,6 +226,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new 
Configuration) extends E
       if (getTable(db, table).tableType == CatalogTableType.MANAGED) {
         val dir = new Path(catalog(db).db.locationUri, table)
         try {
+          val fs = dir.getFileSystem(hadoopConfig)
           fs.delete(dir, true)
         } catch {
           case e: IOException =>
@@ -248,6 +252,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new 
Configuration) extends E
       val oldDir = new Path(catalog(db).db.locationUri, oldName)
       val newDir = new Path(catalog(db).db.locationUri, newName)
       try {
+        val fs = oldDir.getFileSystem(hadoopConfig)
         fs.rename(oldDir, newDir)
       } catch {
         case e: IOException =>
@@ -338,6 +343,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new 
Configuration) extends E
           p.spec.get(col).map(col + "=" + _)
         }.mkString("/")
         try {
+          val fs = tableDir.getFileSystem(hadoopConfig)
           fs.mkdirs(new Path(tableDir, partitionPath))
         } catch {
           case e: IOException =>
@@ -373,6 +379,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new 
Configuration) extends E
           p.get(col).map(col + "=" + _)
         }.mkString("/")
         try {
+          val fs = tableDir.getFileSystem(hadoopConfig)
           fs.delete(new Path(tableDir, partitionPath), true)
         } catch {
           case e: IOException =>
@@ -409,6 +416,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new 
Configuration) extends E
           newSpec.get(col).map(col + "=" + _)
         }.mkString("/")
         try {
+          val fs = tableDir.getFileSystem(hadoopConfig)
           fs.rename(new Path(tableDir, oldPath), new Path(tableDir, newPath))
         } catch {
           case e: IOException =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to