spark git commit: [SPARK-19599][SS] Clean up HDFSMetadataLog
Repository: spark Updated Branches: refs/heads/branch-2.1 6c3539906 -> 88c43f4fb [SPARK-19599][SS] Clean up HDFSMetadataLog ## What changes were proposed in this pull request? SPARK-19464 removed support for Hadoop 2.5 and earlier, so we can do some cleanup for HDFSMetadataLog. This PR includes the following changes: - ~~Remove the workaround codes for HADOOP-10622.~~ Unfortunately, there is another issue [HADOOP-14084](https://issues.apache.org/jira/browse/HADOOP-14084) that prevents us from removing the workaround codes. - Remove unnecessary `writer: (T, OutputStream) => Unit` and just call `serialize` directly. - Remove catching FileNotFoundException. ## How was this patch tested? Jenkins Author: Shixiong ZhuCloses #16932 from zsxwing/metadata-cleanup. (cherry picked from commit 21b4ba2d6f21a9759af879471715c123073bd67a) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/88c43f4f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/88c43f4f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/88c43f4f Branch: refs/heads/branch-2.1 Commit: 88c43f4fb5ea042a119819c11a5cdbe225095c54 Parents: 6c35399 Author: Shixiong Zhu Authored: Wed Feb 15 16:21:43 2017 -0800 Committer: Shixiong Zhu Committed: Wed Feb 15 16:21:49 2017 -0800 -- .../execution/streaming/HDFSMetadataLog.scala | 39 +--- .../execution/streaming/StreamExecution.scala | 4 +- 2 files changed, 19 insertions(+), 24 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/88c43f4f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index 1b41352..e6a48a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -114,15 +114,18 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: case ut: UninterruptibleThread => // When using a local file system, "writeBatch" must be called on a // [[org.apache.spark.util.UninterruptibleThread]] so that interrupts can be disabled -// while writing the batch file. This is because there is a potential dead-lock in -// Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). If the thread running -// "Shell.runCommand" is interrupted, then the thread can get deadlocked. In our case, -// `writeBatch` creates a file using HDFS API and will call "Shell.runCommand" to set -// the file permission if using the local file system, and can get deadlocked if the -// stream execution thread is stopped by interrupt. Hence, we make sure that -// "writeBatch" is called on [[UninterruptibleThread]] which allows us to disable -// interrupts here. Also see SPARK-14131. -ut.runUninterruptibly { writeBatch(batchId, metadata, serialize) } +// while writing the batch file. +// +// This is because Hadoop "Shell.runCommand" swallows InterruptException (HADOOP-14084). +// If the user tries to stop a query, and the thread running "Shell.runCommand" is +// interrupted, then InterruptException will be dropped and the query will be still +// running. (Note: `writeBatch` creates a file using HDFS APIs and will call +// "Shell.runCommand" to set the file permission if using the local file system) +// +// Hence, we make sure that "writeBatch" is called on [[UninterruptibleThread]] which +// allows us to disable interrupts here, in order to propagate the interrupt state +// correctly. Also see SPARK-19599. +ut.runUninterruptibly { writeBatch(batchId, metadata) } case _ => throw new IllegalStateException( "HDFSMetadataLog.add() on a local file system must be executed on " + @@ -132,20 +135,19 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: // For a distributed file system, such as HDFS or S3, if the network is broken, write // operations may just hang until timeout. We should enable interrupts to allow stopping // the query fast. -writeBatch(batchId, metadata, serialize) +
spark git commit: [SPARK-19599][SS] Clean up HDFSMetadataLog
Repository: spark Updated Branches: refs/heads/master f6c3bba22 -> 21b4ba2d6 [SPARK-19599][SS] Clean up HDFSMetadataLog ## What changes were proposed in this pull request? SPARK-19464 removed support for Hadoop 2.5 and earlier, so we can do some cleanup for HDFSMetadataLog. This PR includes the following changes: - ~~Remove the workaround codes for HADOOP-10622.~~ Unfortunately, there is another issue [HADOOP-14084](https://issues.apache.org/jira/browse/HADOOP-14084) that prevents us from removing the workaround codes. - Remove unnecessary `writer: (T, OutputStream) => Unit` and just call `serialize` directly. - Remove catching FileNotFoundException. ## How was this patch tested? Jenkins Author: Shixiong ZhuCloses #16932 from zsxwing/metadata-cleanup. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/21b4ba2d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/21b4ba2d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/21b4ba2d Branch: refs/heads/master Commit: 21b4ba2d6f21a9759af879471715c123073bd67a Parents: f6c3bba Author: Shixiong Zhu Authored: Wed Feb 15 16:21:43 2017 -0800 Committer: Shixiong Zhu Committed: Wed Feb 15 16:21:43 2017 -0800 -- .../execution/streaming/HDFSMetadataLog.scala | 39 +--- .../execution/streaming/StreamExecution.scala | 4 +- 2 files changed, 19 insertions(+), 24 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/21b4ba2d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index bfdc2cb..3155ce0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -114,15 +114,18 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: case ut: UninterruptibleThread => // When using a local file system, "writeBatch" must be called on a // [[org.apache.spark.util.UninterruptibleThread]] so that interrupts can be disabled -// while writing the batch file. This is because there is a potential dead-lock in -// Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). If the thread running -// "Shell.runCommand" is interrupted, then the thread can get deadlocked. In our case, -// `writeBatch` creates a file using HDFS API and will call "Shell.runCommand" to set -// the file permission if using the local file system, and can get deadlocked if the -// stream execution thread is stopped by interrupt. Hence, we make sure that -// "writeBatch" is called on [[UninterruptibleThread]] which allows us to disable -// interrupts here. Also see SPARK-14131. -ut.runUninterruptibly { writeBatch(batchId, metadata, serialize) } +// while writing the batch file. +// +// This is because Hadoop "Shell.runCommand" swallows InterruptException (HADOOP-14084). +// If the user tries to stop a query, and the thread running "Shell.runCommand" is +// interrupted, then InterruptException will be dropped and the query will be still +// running. (Note: `writeBatch` creates a file using HDFS APIs and will call +// "Shell.runCommand" to set the file permission if using the local file system) +// +// Hence, we make sure that "writeBatch" is called on [[UninterruptibleThread]] which +// allows us to disable interrupts here, in order to propagate the interrupt state +// correctly. Also see SPARK-19599. +ut.runUninterruptibly { writeBatch(batchId, metadata) } case _ => throw new IllegalStateException( "HDFSMetadataLog.add() on a local file system must be executed on " + @@ -132,20 +135,19 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: // For a distributed file system, such as HDFS or S3, if the network is broken, write // operations may just hang until timeout. We should enable interrupts to allow stopping // the query fast. -writeBatch(batchId, metadata, serialize) +writeBatch(batchId, metadata) } true } } - def writeTempBatch(metadata: T, writer: (T, OutputStream) => Unit =