spark git commit: [SPARK-19599][SS] Clean up HDFSMetadataLog

2017-02-15 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 6c3539906 -> 88c43f4fb


[SPARK-19599][SS] Clean up HDFSMetadataLog

## What changes were proposed in this pull request?

SPARK-19464 removed support for Hadoop 2.5 and earlier, so we can do some 
cleanup for HDFSMetadataLog.

This PR includes the following changes:
- ~~Remove the workaround codes for HADOOP-10622.~~ Unfortunately, there is 
another issue 
[HADOOP-14084](https://issues.apache.org/jira/browse/HADOOP-14084) that 
prevents us from removing the workaround codes.
- Remove unnecessary `writer: (T, OutputStream) => Unit` and just call 
`serialize` directly.
- Remove catching FileNotFoundException.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu 

Closes #16932 from zsxwing/metadata-cleanup.

(cherry picked from commit 21b4ba2d6f21a9759af879471715c123073bd67a)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/88c43f4f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/88c43f4f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/88c43f4f

Branch: refs/heads/branch-2.1
Commit: 88c43f4fb5ea042a119819c11a5cdbe225095c54
Parents: 6c35399
Author: Shixiong Zhu 
Authored: Wed Feb 15 16:21:43 2017 -0800
Committer: Shixiong Zhu 
Committed: Wed Feb 15 16:21:49 2017 -0800

--
 .../execution/streaming/HDFSMetadataLog.scala   | 39 +---
 .../execution/streaming/StreamExecution.scala   |  4 +-
 2 files changed, 19 insertions(+), 24 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/88c43f4f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index 1b41352..e6a48a0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -114,15 +114,18 @@ class HDFSMetadataLog[T <: AnyRef : 
ClassTag](sparkSession: SparkSession, path:
   case ut: UninterruptibleThread =>
 // When using a local file system, "writeBatch" must be called on a
 // [[org.apache.spark.util.UninterruptibleThread]] so that 
interrupts can be disabled
-// while writing the batch file. This is because there is a 
potential dead-lock in
-// Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). If the 
thread running
-// "Shell.runCommand" is interrupted, then the thread can get 
deadlocked. In our case,
-// `writeBatch` creates a file using HDFS API and will call 
"Shell.runCommand" to set
-// the file permission if using the local file system, and can get 
deadlocked if the
-// stream execution thread is stopped by interrupt. Hence, we make 
sure that
-// "writeBatch" is called on [[UninterruptibleThread]] which 
allows us to disable
-// interrupts here. Also see SPARK-14131.
-ut.runUninterruptibly { writeBatch(batchId, metadata, serialize) }
+// while writing the batch file.
+//
+// This is because Hadoop "Shell.runCommand" swallows 
InterruptException (HADOOP-14084).
+// If the user tries to stop a query, and the thread running 
"Shell.runCommand" is
+// interrupted, then InterruptException will be dropped and the 
query will be still
+// running. (Note: `writeBatch` creates a file using HDFS APIs and 
will call
+// "Shell.runCommand" to set the file permission if using the 
local file system)
+//
+// Hence, we make sure that "writeBatch" is called on 
[[UninterruptibleThread]] which
+// allows us to disable interrupts here, in order to propagate the 
interrupt state
+// correctly. Also see SPARK-19599.
+ut.runUninterruptibly { writeBatch(batchId, metadata) }
   case _ =>
 throw new IllegalStateException(
   "HDFSMetadataLog.add() on a local file system must be executed 
on " +
@@ -132,20 +135,19 @@ class HDFSMetadataLog[T <: AnyRef : 
ClassTag](sparkSession: SparkSession, path:
 // For a distributed file system, such as HDFS or S3, if the network 
is broken, write
 // operations may just hang until timeout. We should enable interrupts 
to allow stopping
 // the query fast.
-writeBatch(batchId, metadata, serialize)
+

spark git commit: [SPARK-19599][SS] Clean up HDFSMetadataLog

2017-02-15 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master f6c3bba22 -> 21b4ba2d6


[SPARK-19599][SS] Clean up HDFSMetadataLog

## What changes were proposed in this pull request?

SPARK-19464 removed support for Hadoop 2.5 and earlier, so we can do some 
cleanup for HDFSMetadataLog.

This PR includes the following changes:
- ~~Remove the workaround codes for HADOOP-10622.~~ Unfortunately, there is 
another issue 
[HADOOP-14084](https://issues.apache.org/jira/browse/HADOOP-14084) that 
prevents us from removing the workaround codes.
- Remove unnecessary `writer: (T, OutputStream) => Unit` and just call 
`serialize` directly.
- Remove catching FileNotFoundException.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu 

Closes #16932 from zsxwing/metadata-cleanup.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/21b4ba2d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/21b4ba2d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/21b4ba2d

Branch: refs/heads/master
Commit: 21b4ba2d6f21a9759af879471715c123073bd67a
Parents: f6c3bba
Author: Shixiong Zhu 
Authored: Wed Feb 15 16:21:43 2017 -0800
Committer: Shixiong Zhu 
Committed: Wed Feb 15 16:21:43 2017 -0800

--
 .../execution/streaming/HDFSMetadataLog.scala   | 39 +---
 .../execution/streaming/StreamExecution.scala   |  4 +-
 2 files changed, 19 insertions(+), 24 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/21b4ba2d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index bfdc2cb..3155ce0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -114,15 +114,18 @@ class HDFSMetadataLog[T <: AnyRef : 
ClassTag](sparkSession: SparkSession, path:
   case ut: UninterruptibleThread =>
 // When using a local file system, "writeBatch" must be called on a
 // [[org.apache.spark.util.UninterruptibleThread]] so that 
interrupts can be disabled
-// while writing the batch file. This is because there is a 
potential dead-lock in
-// Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). If the 
thread running
-// "Shell.runCommand" is interrupted, then the thread can get 
deadlocked. In our case,
-// `writeBatch` creates a file using HDFS API and will call 
"Shell.runCommand" to set
-// the file permission if using the local file system, and can get 
deadlocked if the
-// stream execution thread is stopped by interrupt. Hence, we make 
sure that
-// "writeBatch" is called on [[UninterruptibleThread]] which 
allows us to disable
-// interrupts here. Also see SPARK-14131.
-ut.runUninterruptibly { writeBatch(batchId, metadata, serialize) }
+// while writing the batch file.
+//
+// This is because Hadoop "Shell.runCommand" swallows 
InterruptException (HADOOP-14084).
+// If the user tries to stop a query, and the thread running 
"Shell.runCommand" is
+// interrupted, then InterruptException will be dropped and the 
query will be still
+// running. (Note: `writeBatch` creates a file using HDFS APIs and 
will call
+// "Shell.runCommand" to set the file permission if using the 
local file system)
+//
+// Hence, we make sure that "writeBatch" is called on 
[[UninterruptibleThread]] which
+// allows us to disable interrupts here, in order to propagate the 
interrupt state
+// correctly. Also see SPARK-19599.
+ut.runUninterruptibly { writeBatch(batchId, metadata) }
   case _ =>
 throw new IllegalStateException(
   "HDFSMetadataLog.add() on a local file system must be executed 
on " +
@@ -132,20 +135,19 @@ class HDFSMetadataLog[T <: AnyRef : 
ClassTag](sparkSession: SparkSession, path:
 // For a distributed file system, such as HDFS or S3, if the network 
is broken, write
 // operations may just hang until timeout. We should enable interrupts 
to allow stopping
 // the query fast.
-writeBatch(batchId, metadata, serialize)
+writeBatch(batchId, metadata)
   }
   true
 }
   }
 
-  def writeTempBatch(metadata: T, writer: (T, OutputStream) => Unit =