Repository: spark
Updated Branches:
  refs/heads/master 24587ce43 -> 13945dd83


[SPARK-14109][SQL] Fix HDFSMetadataLog to fallback from FileContext to 
FileSystem API

## What changes were proposed in this pull request?

HDFSMetadataLog uses newer FileContext API to achieve atomic renaming. However, 
FileContext implementations may not exist for many scheme for which there may 
be FileSystem implementations. In those cases, rather than failing completely, 
we should fallback to the FileSystem based implementation, and log warning that 
there may be file consistency issues in case the log directory is concurrently 
modified.

In addition I have also added more tests to increase the code coverage.

## How was this patch tested?

Unit test.
Tested on cluster with custom file system.

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #11925 from tdas/SPARK-14109.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/13945dd8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/13945dd8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/13945dd8

Branch: refs/heads/master
Commit: 13945dd83bfa47ebd05181bda5a7c3e412feb5c0
Parents: 24587ce
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Fri Mar 25 20:07:54 2016 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Fri Mar 25 20:07:54 2016 -0700

----------------------------------------------------------------------
 .../execution/streaming/HDFSMetadataLog.scala   | 179 +++++++++++++++++--
 .../streaming/HDFSMetadataLogSuite.scala        | 122 ++++++++++++-
 .../spark/sql/test/SharedSQLContext.scala       |   5 +-
 .../apache/spark/sql/test/TestSQLContext.scala  |   8 +-
 4 files changed, 288 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/13945dd8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index f27d23b..9663fee 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -19,11 +19,12 @@ package org.apache.spark.sql.execution.streaming
 
 import java.io.{FileNotFoundException, IOException}
 import java.nio.ByteBuffer
-import java.util.{ConcurrentModificationException, EnumSet}
+import java.util.{ConcurrentModificationException, EnumSet, UUID}
 
 import scala.reflect.ClassTag
 
 import org.apache.commons.io.IOUtils
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
 import org.apache.hadoop.fs.permission.FsPermission
 
@@ -32,6 +33,7 @@ import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.serializer.JavaSerializer
 import org.apache.spark.sql.SQLContext
 
+
 /**
  * A [[MetadataLog]] implementation based on HDFS. [[HDFSMetadataLog]] uses 
the specified `path`
  * as the metadata storage.
@@ -47,17 +49,13 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, 
path: String)
   extends MetadataLog[T]
   with Logging {
 
-  private val metadataPath = new Path(path)
+  import HDFSMetadataLog._
 
-  private val fc =
-    if (metadataPath.toUri.getScheme == null) {
-      FileContext.getFileContext(sqlContext.sparkContext.hadoopConfiguration)
-    } else {
-      FileContext.getFileContext(metadataPath.toUri, 
sqlContext.sparkContext.hadoopConfiguration)
-    }
+  private val metadataPath = new Path(path)
+  private val fileManager = createFileManager()
 
-  if (!fc.util().exists(metadataPath)) {
-    fc.mkdir(metadataPath, FsPermission.getDirDefault, true)
+  if (!fileManager.exists(metadataPath)) {
+    fileManager.mkdirs(metadataPath)
   }
 
   /**
@@ -104,10 +102,9 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, 
path: String)
     // Use nextId to create a temp file
     var nextId = 0
     while (true) {
-      val tempPath = new Path(metadataPath, s".${batchId}_$nextId.tmp")
-      fc.deleteOnExit(tempPath)
+      val tempPath = new Path(metadataPath, 
s".${UUID.randomUUID.toString}.tmp")
       try {
-        val output = fc.create(tempPath, EnumSet.of(CreateFlag.CREATE))
+        val output = fileManager.create(tempPath)
         try {
           output.write(bytes)
         } finally {
@@ -117,7 +114,7 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, 
path: String)
           // Try to commit the batch
           // It will fail if there is an existing file (someone has committed 
the batch)
           logDebug(s"Attempting to write log #${batchFile(batchId)}")
-          fc.rename(tempPath, batchFile(batchId), Options.Rename.NONE)
+          fileManager.rename(tempPath, batchFile(batchId))
           return
         } catch {
           case e: IOException if isFileAlreadyExistsException(e) =>
@@ -147,6 +144,8 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, 
path: String)
           // metadata path. In addition, the old Streaming also have this 
issue, people can create
           // malicious checkpoint files to crash a Streaming application too.
           nextId += 1
+      } finally {
+        fileManager.delete(tempPath)
       }
     }
   }
@@ -160,8 +159,8 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, 
path: String)
 
   override def get(batchId: Long): Option[T] = {
     val batchMetadataFile = batchFile(batchId)
-    if (fc.util().exists(batchMetadataFile)) {
-      val input = fc.open(batchMetadataFile)
+    if (fileManager.exists(batchMetadataFile)) {
+      val input = fileManager.open(batchMetadataFile)
       val bytes = IOUtils.toByteArray(input)
       Some(serializer.deserialize[T](ByteBuffer.wrap(bytes)))
     } else {
@@ -171,7 +170,7 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, 
path: String)
   }
 
   override def get(startId: Option[Long], endId: Option[Long]): Array[(Long, 
T)] = {
-    val files = fc.util().listStatus(metadataPath, batchFilesFilter)
+    val files = fileManager.list(metadataPath, batchFilesFilter)
     val batchIds = files
       .map(_.getPath.getName.toLong)
       .filter { batchId =>
@@ -184,7 +183,7 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, 
path: String)
   }
 
   override def getLatest(): Option[(Long, T)] = {
-    val batchIds = fc.util().listStatus(metadataPath, batchFilesFilter)
+    val batchIds = fileManager.list(metadataPath, batchFilesFilter)
       .map(_.getPath.getName.toLong)
       .sorted
       .reverse
@@ -196,4 +195,148 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: 
SQLContext, path: String)
     }
     None
   }
+
+  private def createFileManager(): FileManager = {
+    val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
+    try {
+      new FileContextManager(metadataPath, hadoopConf)
+    } catch {
+      case e: UnsupportedFileSystemException =>
+        logWarning("Could not use FileContext API for managing metadata log 
file. The log may be" +
+          "inconsistent under failures.", e)
+        new FileSystemManager(metadataPath, hadoopConf)
+    }
+  }
+}
+
+object HDFSMetadataLog {
+
+  /** A simple trait to abstract out the file management operations needed by 
HDFSMetadataLog. */
+  trait FileManager {
+
+    /** List the files in a path that matches a filter. */
+    def list(path: Path, filter: PathFilter): Array[FileStatus]
+
+    /** Make directory at the give path and all its parent directories as 
needed. */
+    def mkdirs(path: Path): Unit
+
+    /** Whether path exists */
+    def exists(path: Path): Boolean
+
+    /** Open a file for reading, or throw exception if it does not exist. */
+    def open(path: Path): FSDataInputStream
+
+    /** Create path, or throw exception if it already exists */
+    def create(path: Path): FSDataOutputStream
+
+    /**
+     * Atomically rename path, or throw exception if it cannot be done.
+     * Should throw FileNotFoundException if srcPath does not exist.
+     * Should throw FileAlreadyExistsException if destPath already exists.
+     */
+    def rename(srcPath: Path, destPath: Path): Unit
+
+    /** Recursively delete a path if it exists. Should not throw exception if 
file doesn't exist. */
+    def delete(path: Path): Unit
+  }
+
+  /**
+   * Default implementation of FileManager using newer FileContext API.
+   */
+  class FileContextManager(path: Path, hadoopConf: Configuration) extends 
FileManager {
+    private val fc = if (path.toUri.getScheme == null) {
+      FileContext.getFileContext(hadoopConf)
+    } else {
+      FileContext.getFileContext(path.toUri, hadoopConf)
+    }
+
+    override def list(path: Path, filter: PathFilter): Array[FileStatus] = {
+      fc.util.listStatus(path, filter)
+    }
+
+    override def rename(srcPath: Path, destPath: Path): Unit = {
+      fc.rename(srcPath, destPath)
+    }
+
+    override def mkdirs(path: Path): Unit = {
+      fc.mkdir(path, FsPermission.getDirDefault, true)
+    }
+
+    override def open(path: Path): FSDataInputStream = {
+      fc.open(path)
+    }
+
+    override def create(path: Path): FSDataOutputStream = {
+      fc.create(path, EnumSet.of(CreateFlag.CREATE))
+    }
+
+    override def exists(path: Path): Boolean = {
+      fc.util().exists(path)
+    }
+
+    override def delete(path: Path): Unit = {
+      try {
+        fc.delete(path, true)
+      } catch {
+        case e: FileNotFoundException =>
+        // ignore if file has already been deleted
+      }
+    }
+  }
+
+  /**
+   * Implementation of FileManager using older FileSystem API. Note that this 
implementation
+   * cannot provide atomic renaming of paths, hence can lead to consistency 
issues. This
+   * should be used only as a backup option, when FileContextManager cannot be 
used.
+   */
+  class FileSystemManager(path: Path, hadoopConf: Configuration) extends 
FileManager {
+    private val fs = path.getFileSystem(hadoopConf)
+
+    override def list(path: Path, filter: PathFilter): Array[FileStatus] = {
+      fs.listStatus(path, filter)
+    }
+
+    /**
+     * Rename a path. Note that this implementation is not atomic.
+     * @throws FileNotFoundException if source path does not exist.
+     * @throws FileAlreadyExistsException if destination path already exists.
+     * @throws IOException if renaming fails for some unknown reason.
+     */
+    override def rename(srcPath: Path, destPath: Path): Unit = {
+      if (!fs.exists(srcPath)) {
+        throw new FileNotFoundException(s"Source path does not exist: 
$srcPath")
+      }
+      if (fs.exists(destPath)) {
+        throw new FileAlreadyExistsException(s"Destination path already 
exists: $destPath")
+      }
+      if (!fs.rename(srcPath, destPath)) {
+        throw new IOException(s"Failed to rename $srcPath to $destPath")
+      }
+    }
+
+    override def mkdirs(path: Path): Unit = {
+      fs.mkdirs(path, FsPermission.getDirDefault)
+    }
+
+    override def open(path: Path): FSDataInputStream = {
+      fs.open(path)
+    }
+
+    override def create(path: Path): FSDataOutputStream = {
+      fs.create(path, false)
+    }
+
+    override def exists(path: Path): Boolean = {
+      fs.exists(path)
+    }
+
+    override def delete(path: Path): Unit = {
+      try {
+        fs.delete(path, true)
+      } catch {
+        case e: FileNotFoundException =>
+          // ignore if file has already been deleted
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/13945dd8/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
index 9ed5686..d5db9db 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
@@ -17,21 +17,48 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.io.{File, FileNotFoundException, IOException}
+import java.net.URI
 import java.util.ConcurrentModificationException
 
+import scala.util.Random
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
 import org.scalatest.concurrent.AsyncAssertions._
 import org.scalatest.time.SpanSugar._
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.sql.execution.streaming.FakeFileSystem._
+import 
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.{FileContextManager, 
FileManager, FileSystemManager}
 import org.apache.spark.sql.test.SharedSQLContext
 
 class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
 
+  /** To avoid caching of FS objects */
+  override protected val sparkConf =
+    new SparkConf().set(s"spark.hadoop.fs.$scheme.impl.disable.cache", "true")
+
   private implicit def toOption[A](a: A): Option[A] = Option(a)
 
-  test("basic") {
+  test("FileManager: FileContextManager") {
     withTempDir { temp =>
-      val metadataLog = new HDFSMetadataLog[String](sqlContext, 
temp.getAbsolutePath)
+      val path = new Path(temp.getAbsolutePath)
+      testManager(path, new FileContextManager(path, new Configuration))
+    }
+  }
+
+  test("FileManager: FileSystemManager") {
+    withTempDir { temp =>
+      val path = new Path(temp.getAbsolutePath)
+      testManager(path, new FileSystemManager(path, new Configuration))
+    }
+  }
+
+  test("HDFSMetadataLog: basic") {
+    withTempDir { temp =>
+      val dir = new File(temp, "dir") // use non-existent directory to test 
whether log make the dir
+      val metadataLog = new HDFSMetadataLog[String](sqlContext, 
dir.getAbsolutePath)
       assert(metadataLog.add(0, "batch0"))
       assert(metadataLog.getLatest() === Some(0 -> "batch0"))
       assert(metadataLog.get(0) === Some("batch0"))
@@ -53,7 +80,27 @@ class HDFSMetadataLogSuite extends SparkFunSuite with 
SharedSQLContext {
     }
   }
 
-  test("restart") {
+  testQuietly("HDFSMetadataLog: fallback from FileContext to FileSystem") {
+    sqlContext.sparkContext.hadoopConfiguration.set(
+      s"fs.$scheme.impl",
+      classOf[FakeFileSystem].getName)
+    withTempDir { temp =>
+      val metadataLog = new HDFSMetadataLog[String](sqlContext, 
s"$scheme://$temp")
+      assert(metadataLog.add(0, "batch0"))
+      assert(metadataLog.getLatest() === Some(0 -> "batch0"))
+      assert(metadataLog.get(0) === Some("batch0"))
+      assert(metadataLog.get(None, 0) === Array(0 -> "batch0"))
+
+
+      val metadataLog2 = new HDFSMetadataLog[String](sqlContext, 
s"$scheme://$temp")
+      assert(metadataLog2.get(0) === Some("batch0"))
+      assert(metadataLog2.getLatest() === Some(0 -> "batch0"))
+      assert(metadataLog2.get(None, 0) === Array(0 -> "batch0"))
+
+    }
+  }
+
+  test("HDFSMetadataLog: restart") {
     withTempDir { temp =>
       val metadataLog = new HDFSMetadataLog[String](sqlContext, 
temp.getAbsolutePath)
       assert(metadataLog.add(0, "batch0"))
@@ -71,7 +118,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with 
SharedSQLContext {
     }
   }
 
-  test("metadata directory collision") {
+  test("HDFSMetadataLog: metadata directory collision") {
     withTempDir { temp =>
       val waiter = new Waiter
       val maxBatchId = 100
@@ -102,4 +149,69 @@ class HDFSMetadataLogSuite extends SparkFunSuite with 
SharedSQLContext {
       assert(metadataLog.get(None, maxBatchId) === (0 to maxBatchId).map(i => 
(i, i.toString)))
     }
   }
+
+
+  def testManager(basePath: Path, fm: FileManager): Unit = {
+    // Mkdirs
+    val dir = new Path(s"$basePath/dir/subdir/subsubdir")
+    assert(!fm.exists(dir))
+    fm.mkdirs(dir)
+    assert(fm.exists(dir))
+    fm.mkdirs(dir)
+
+    // List
+    val acceptAllFilter = new PathFilter {
+      override def accept(path: Path): Boolean = true
+    }
+    val rejectAllFilter = new PathFilter {
+      override def accept(path: Path): Boolean = false
+    }
+    assert(fm.list(basePath, acceptAllFilter).exists(_.getPath.getName == 
"dir"))
+    assert(fm.list(basePath, rejectAllFilter).length === 0)
+
+    // Create
+    val path = new Path(s"$dir/file")
+    assert(!fm.exists(path))
+    fm.create(path).close()
+    assert(fm.exists(path))
+    intercept[IOException] {
+      fm.create(path)
+    }
+
+    // Open and delete
+    fm.open(path)
+    fm.delete(path)
+    assert(!fm.exists(path))
+    intercept[IOException] {
+      fm.open(path)
+    }
+    fm.delete(path)  // should not throw exception
+
+    // Rename
+    val path1 = new Path(s"$dir/file1")
+    val path2 = new Path(s"$dir/file2")
+    fm.create(path1).close()
+    assert(fm.exists(path1))
+    fm.rename(path1, path2)
+    intercept[FileNotFoundException] {
+      fm.rename(path1, path2)
+    }
+    val path3 = new Path(s"$dir/file3")
+    fm.create(path3).close()
+    assert(fm.exists(path3))
+    intercept[FileAlreadyExistsException] {
+      fm.rename(path2, path3)
+    }
+  }
+}
+
+/** FakeFileSystem to test fallback of the HDFSMetadataLog from FileContext to 
FileSystem API */
+class FakeFileSystem extends RawLocalFileSystem {
+  override def getUri: URI = {
+    URI.create(s"$scheme:///")
+  }
+}
+
+object FakeFileSystem {
+  val scheme = s"HDFSMetadataLogSuite${math.abs(Random.nextInt)}"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/13945dd8/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
index c341191..914c6a5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.test
 
+import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.sql.SQLContext
 
 
@@ -25,6 +26,8 @@ import org.apache.spark.sql.SQLContext
  */
 trait SharedSQLContext extends SQLTestUtils {
 
+  protected val sparkConf = new SparkConf()
+
   /**
    * The [[TestSQLContext]] to use for all tests in this suite.
    *
@@ -44,7 +47,7 @@ trait SharedSQLContext extends SQLTestUtils {
   protected override def beforeAll(): Unit = {
     SQLContext.clearSqlListener()
     if (_ctx == null) {
-      _ctx = new TestSQLContext
+      _ctx = new TestSQLContext(sparkConf)
     }
     // Ensure we have initialized the context before calling parent code
     super.beforeAll()

http://git-wip-us.apache.org/repos/asf/spark/blob/13945dd8/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
index b3e146f..7ab79b1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
@@ -26,9 +26,13 @@ import org.apache.spark.sql.internal.{SessionState, SQLConf}
  */
 private[sql] class TestSQLContext(sc: SparkContext) extends SQLContext(sc) { 
self =>
 
-  def this() {
+  def this(sparkConf: SparkConf) {
     this(new SparkContext("local[2]", "test-sql-context",
-      new SparkConf().set("spark.sql.testkey", "true")))
+      sparkConf.set("spark.sql.testkey", "true")))
+  }
+
+  def this() {
+    this(new SparkConf)
   }
 
   @transient


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to