spark git commit: [SPARK-20848][SQL][FOLLOW-UP] Shutdown the pool after reading parquet files
Repository: spark Updated Branches: refs/heads/branch-2.1 c3302e81e -> 7015f6f0e [SPARK-20848][SQL][FOLLOW-UP] Shutdown the pool after reading parquet files ## What changes were proposed in this pull request? This is a follow-up to #18073. Taking a safer approach to shutdown the pool to prevent possible issue. Also using `ThreadUtils.newForkJoinPool` instead to set a better thread name. ## How was this patch tested? Manually test. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi HsiehCloses #18100 from viirya/SPARK-20848-followup. (cherry picked from commit 6b68d61cf31748a088778dfdd66491b2f89a3c7b) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7015f6f0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7015f6f0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7015f6f0 Branch: refs/heads/branch-2.1 Commit: 7015f6f0e7889616db9595b4e68b5a5b5ffe921a Parents: c3302e8 Author: Liang-Chi Hsieh Authored: Thu May 25 09:55:45 2017 +0800 Committer: Wenchen Fan Committed: Thu May 25 09:56:16 2017 +0800 -- .../datasources/parquet/ParquetFileFormat.scala | 42 ++-- 1 file changed, 22 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7015f6f0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 70eb01c..f303a01 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -50,7 +50,7 @@ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ -import org.apache.spark.util.SerializableConfiguration +import org.apache.spark.util.{SerializableConfiguration, ThreadUtils} class ParquetFileFormat extends FileFormat @@ -496,27 +496,29 @@ object ParquetFileFormat extends Logging { partFiles: Seq[FileStatus], ignoreCorruptFiles: Boolean): Seq[Footer] = { val parFiles = partFiles.par -val pool = new ForkJoinPool(8) +val pool = ThreadUtils.newForkJoinPool("readingParquetFooters", 8) parFiles.tasksupport = new ForkJoinTaskSupport(pool) -parFiles.flatMap { currentFile => - try { -// Skips row group information since we only need the schema. -// ParquetFileReader.readFooter throws RuntimeException, instead of IOException, -// when it can't read the footer. -Some(new Footer(currentFile.getPath(), - ParquetFileReader.readFooter( -conf, currentFile, SKIP_ROW_GROUPS))) - } catch { case e: RuntimeException => -if (ignoreCorruptFiles) { - logWarning(s"Skipped the footer in the corrupted file: $currentFile", e) - None -} else { - throw new IOException(s"Could not read footer for file: $currentFile", e) +try { + parFiles.flatMap { currentFile => +try { + // Skips row group information since we only need the schema. + // ParquetFileReader.readFooter throws RuntimeException, instead of IOException, + // when it can't read the footer. + Some(new Footer(currentFile.getPath(), +ParquetFileReader.readFooter( + conf, currentFile, SKIP_ROW_GROUPS))) +} catch { case e: RuntimeException => + if (ignoreCorruptFiles) { +logWarning(s"Skipped the footer in the corrupted file: $currentFile", e) +None + } else { +throw new IOException(s"Could not read footer for file: $currentFile", e) + } } - } finally { -pool.shutdown() - } -}.seq + }.seq +} finally { + pool.shutdown() +} } /** - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20848][SQL][FOLLOW-UP] Shutdown the pool after reading parquet files
Repository: spark Updated Branches: refs/heads/branch-2.2 3f82d65bf -> e0aa23939 [SPARK-20848][SQL][FOLLOW-UP] Shutdown the pool after reading parquet files ## What changes were proposed in this pull request? This is a follow-up to #18073. Taking a safer approach to shutdown the pool to prevent possible issue. Also using `ThreadUtils.newForkJoinPool` instead to set a better thread name. ## How was this patch tested? Manually test. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi HsiehCloses #18100 from viirya/SPARK-20848-followup. (cherry picked from commit 6b68d61cf31748a088778dfdd66491b2f89a3c7b) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e0aa2393 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e0aa2393 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e0aa2393 Branch: refs/heads/branch-2.2 Commit: e0aa23939a4cbf95f2cc83a7f5adee841b491358 Parents: 3f82d65 Author: Liang-Chi Hsieh Authored: Thu May 25 09:55:45 2017 +0800 Committer: Wenchen Fan Committed: Thu May 25 09:55:59 2017 +0800 -- .../datasources/parquet/ParquetFileFormat.scala | 42 ++-- 1 file changed, 22 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e0aa2393/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 29ed890..87fbf8b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -50,7 +50,7 @@ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ -import org.apache.spark.util.SerializableConfiguration +import org.apache.spark.util.{SerializableConfiguration, ThreadUtils} class ParquetFileFormat extends FileFormat @@ -479,27 +479,29 @@ object ParquetFileFormat extends Logging { partFiles: Seq[FileStatus], ignoreCorruptFiles: Boolean): Seq[Footer] = { val parFiles = partFiles.par -val pool = new ForkJoinPool(8) +val pool = ThreadUtils.newForkJoinPool("readingParquetFooters", 8) parFiles.tasksupport = new ForkJoinTaskSupport(pool) -parFiles.flatMap { currentFile => - try { -// Skips row group information since we only need the schema. -// ParquetFileReader.readFooter throws RuntimeException, instead of IOException, -// when it can't read the footer. -Some(new Footer(currentFile.getPath(), - ParquetFileReader.readFooter( -conf, currentFile, SKIP_ROW_GROUPS))) - } catch { case e: RuntimeException => -if (ignoreCorruptFiles) { - logWarning(s"Skipped the footer in the corrupted file: $currentFile", e) - None -} else { - throw new IOException(s"Could not read footer for file: $currentFile", e) +try { + parFiles.flatMap { currentFile => +try { + // Skips row group information since we only need the schema. + // ParquetFileReader.readFooter throws RuntimeException, instead of IOException, + // when it can't read the footer. + Some(new Footer(currentFile.getPath(), +ParquetFileReader.readFooter( + conf, currentFile, SKIP_ROW_GROUPS))) +} catch { case e: RuntimeException => + if (ignoreCorruptFiles) { +logWarning(s"Skipped the footer in the corrupted file: $currentFile", e) +None + } else { +throw new IOException(s"Could not read footer for file: $currentFile", e) + } } - } finally { -pool.shutdown() - } -}.seq + }.seq +} finally { + pool.shutdown() +} } /** - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20848][SQL][FOLLOW-UP] Shutdown the pool after reading parquet files
Repository: spark Updated Branches: refs/heads/master 197f9018a -> 6b68d61cf [SPARK-20848][SQL][FOLLOW-UP] Shutdown the pool after reading parquet files ## What changes were proposed in this pull request? This is a follow-up to #18073. Taking a safer approach to shutdown the pool to prevent possible issue. Also using `ThreadUtils.newForkJoinPool` instead to set a better thread name. ## How was this patch tested? Manually test. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi HsiehCloses #18100 from viirya/SPARK-20848-followup. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6b68d61c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6b68d61c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6b68d61c Branch: refs/heads/master Commit: 6b68d61cf31748a088778dfdd66491b2f89a3c7b Parents: 197f901 Author: Liang-Chi Hsieh Authored: Thu May 25 09:55:45 2017 +0800 Committer: Wenchen Fan Committed: Thu May 25 09:55:45 2017 +0800 -- .../datasources/parquet/ParquetFileFormat.scala | 42 ++-- 1 file changed, 22 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6b68d61c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 29ed890..87fbf8b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -50,7 +50,7 @@ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ -import org.apache.spark.util.SerializableConfiguration +import org.apache.spark.util.{SerializableConfiguration, ThreadUtils} class ParquetFileFormat extends FileFormat @@ -479,27 +479,29 @@ object ParquetFileFormat extends Logging { partFiles: Seq[FileStatus], ignoreCorruptFiles: Boolean): Seq[Footer] = { val parFiles = partFiles.par -val pool = new ForkJoinPool(8) +val pool = ThreadUtils.newForkJoinPool("readingParquetFooters", 8) parFiles.tasksupport = new ForkJoinTaskSupport(pool) -parFiles.flatMap { currentFile => - try { -// Skips row group information since we only need the schema. -// ParquetFileReader.readFooter throws RuntimeException, instead of IOException, -// when it can't read the footer. -Some(new Footer(currentFile.getPath(), - ParquetFileReader.readFooter( -conf, currentFile, SKIP_ROW_GROUPS))) - } catch { case e: RuntimeException => -if (ignoreCorruptFiles) { - logWarning(s"Skipped the footer in the corrupted file: $currentFile", e) - None -} else { - throw new IOException(s"Could not read footer for file: $currentFile", e) +try { + parFiles.flatMap { currentFile => +try { + // Skips row group information since we only need the schema. + // ParquetFileReader.readFooter throws RuntimeException, instead of IOException, + // when it can't read the footer. + Some(new Footer(currentFile.getPath(), +ParquetFileReader.readFooter( + conf, currentFile, SKIP_ROW_GROUPS))) +} catch { case e: RuntimeException => + if (ignoreCorruptFiles) { +logWarning(s"Skipped the footer in the corrupted file: $currentFile", e) +None + } else { +throw new IOException(s"Could not read footer for file: $currentFile", e) + } } - } finally { -pool.shutdown() - } -}.seq + }.seq +} finally { + pool.shutdown() +} } /** - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20403][SQL] Modify the instructions of some functions
Repository: spark Updated Branches: refs/heads/branch-2.2 ae65d3014 -> 3f82d65bf [SPARK-20403][SQL] Modify the instructions of some functions ## What changes were proposed in this pull request? 1.add instructions of 'cast' function When using 'show functions' and 'desc function cast' command in spark-sql 2.Modify the instructions of functionsï¼such as booleanï¼tinyintï¼smallintï¼intï¼bigintï¼floatï¼doubleï¼decimalï¼dateï¼timestampï¼binaryï¼string ## How was this patch tested? Before modificationï¼ spark-sql>desc function boolean; Function: boolean Class: org.apache.spark.sql.catalyst.expressions.Cast Usage: boolean(expr AS type) - Casts the value `expr` to the target data type `type`. After modificationï¼ spark-sql> desc function boolean; Function: boolean Class: org.apache.spark.sql.catalyst.expressions.Cast Usage: boolean(expr) - Casts the value `expr` to the target data type `boolean`. spark-sql> desc function cast Function: cast Class: org.apache.spark.sql.catalyst.expressions.Cast Usage: cast(expr AS type) - Casts the value `expr` to the target data type `type`. Author: liuxianCloses #17698 from 10110346/wip_lx_0418. (cherry picked from commit 197f9018a4641c8fc0725905ebfb535b61bed791) Signed-off-by: Xiao Li Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3f82d65b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3f82d65b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3f82d65b Branch: refs/heads/branch-2.2 Commit: 3f82d65bf6a628b0d46bb2eded9ed12f1d5aa9d2 Parents: ae65d30 Author: liuxian Authored: Wed May 24 17:32:02 2017 -0700 Committer: Xiao Li Committed: Wed May 24 17:32:18 2017 -0700 -- .../catalyst/analysis/FunctionRegistry.scala| 6 - .../catalyst/expressions/mathExpressions.scala | 2 +- .../test/resources/sql-tests/inputs/cast.sql| 2 ++ .../resources/sql-tests/results/cast.sql.out| 23 +++- 4 files changed, 30 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3f82d65b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 6fc154f..96b6b11 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -430,6 +430,8 @@ object FunctionRegistry { expression[StructsToJson]("to_json"), expression[JsonToStructs]("from_json"), +// cast +expression[Cast]("cast"), // Cast aliases (SPARK-16730) castAlias("boolean", BooleanType), castAlias("tinyint", ByteType), @@ -512,7 +514,9 @@ object FunctionRegistry { } Cast(args.head, dataType) } -(name, (expressionInfo[Cast](name), builder)) +val clazz = scala.reflect.classTag[Cast].runtimeClass +val usage = "_FUNC_(expr) - Casts the value `expr` to the target data type `_FUNC_`." +(name, (new ExpressionInfo(clazz.getCanonicalName, null, name, usage, null), builder)) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/3f82d65b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala index de1a46d..e040ad0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala @@ -966,7 +966,7 @@ case class Logarithm(left: Expression, right: Expression) * * @param child expr to be round, all [[NumericType]] is allowed as Input * @param scale new scale to be round to, this should be a constant int at runtime - * @param mode rounding mode (e.g. HALF_UP, HALF_UP) + * @param mode rounding mode (e.g. HALF_UP, HALF_EVEN) * @param modeStr rounding mode string name (e.g. "ROUND_HALF_UP", "ROUND_HALF_EVEN") */ abstract class RoundBase(child: Expression, scale: Expression, http://git-wip-us.apache.org/repos/asf/spark/blob/3f82d65b/sql/core/src/test/resources/sql-tests/inputs/cast.sql -- diff --git
spark git commit: [SPARK-20403][SQL] Modify the instructions of some functions
Repository: spark Updated Branches: refs/heads/master 5f8ff2fc9 -> 197f9018a [SPARK-20403][SQL] Modify the instructions of some functions ## What changes were proposed in this pull request? 1.add instructions of 'cast' function When using 'show functions' and 'desc function cast' command in spark-sql 2.Modify the instructions of functionsï¼such as booleanï¼tinyintï¼smallintï¼intï¼bigintï¼floatï¼doubleï¼decimalï¼dateï¼timestampï¼binaryï¼string ## How was this patch tested? Before modificationï¼ spark-sql>desc function boolean; Function: boolean Class: org.apache.spark.sql.catalyst.expressions.Cast Usage: boolean(expr AS type) - Casts the value `expr` to the target data type `type`. After modificationï¼ spark-sql> desc function boolean; Function: boolean Class: org.apache.spark.sql.catalyst.expressions.Cast Usage: boolean(expr) - Casts the value `expr` to the target data type `boolean`. spark-sql> desc function cast Function: cast Class: org.apache.spark.sql.catalyst.expressions.Cast Usage: cast(expr AS type) - Casts the value `expr` to the target data type `type`. Author: liuxianCloses #17698 from 10110346/wip_lx_0418. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/197f9018 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/197f9018 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/197f9018 Branch: refs/heads/master Commit: 197f9018a4641c8fc0725905ebfb535b61bed791 Parents: 5f8ff2f Author: liuxian Authored: Wed May 24 17:32:02 2017 -0700 Committer: Xiao Li Committed: Wed May 24 17:32:02 2017 -0700 -- .../catalyst/analysis/FunctionRegistry.scala| 6 - .../catalyst/expressions/mathExpressions.scala | 2 +- .../test/resources/sql-tests/inputs/cast.sql| 2 ++ .../resources/sql-tests/results/cast.sql.out| 23 +++- 4 files changed, 30 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/197f9018/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index d2042ad..7521a7e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -431,6 +431,8 @@ object FunctionRegistry { expression[StructsToJson]("to_json"), expression[JsonToStructs]("from_json"), +// cast +expression[Cast]("cast"), // Cast aliases (SPARK-16730) castAlias("boolean", BooleanType), castAlias("tinyint", ByteType), @@ -513,7 +515,9 @@ object FunctionRegistry { } Cast(args.head, dataType) } -(name, (expressionInfo[Cast](name), builder)) +val clazz = scala.reflect.classTag[Cast].runtimeClass +val usage = "_FUNC_(expr) - Casts the value `expr` to the target data type `_FUNC_`." +(name, (new ExpressionInfo(clazz.getCanonicalName, null, name, usage, null), builder)) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/197f9018/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala index bf46a39..754b5c4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala @@ -982,7 +982,7 @@ case class Logarithm(left: Expression, right: Expression) * * @param child expr to be round, all [[NumericType]] is allowed as Input * @param scale new scale to be round to, this should be a constant int at runtime - * @param mode rounding mode (e.g. HALF_UP, HALF_UP) + * @param mode rounding mode (e.g. HALF_UP, HALF_EVEN) * @param modeStr rounding mode string name (e.g. "ROUND_HALF_UP", "ROUND_HALF_EVEN") */ abstract class RoundBase(child: Expression, scale: Expression, http://git-wip-us.apache.org/repos/asf/spark/blob/197f9018/sql/core/src/test/resources/sql-tests/inputs/cast.sql -- diff --git a/sql/core/src/test/resources/sql-tests/inputs/cast.sql b/sql/core/src/test/resources/sql-tests/inputs/cast.sql index
spark git commit: [SPARK-18406][CORE][BACKPORT-2.1] Race between end-of-task and completion iterator read lock release
Repository: spark Updated Branches: refs/heads/branch-2.1 2f68631f5 -> c3302e81e [SPARK-18406][CORE][BACKPORT-2.1] Race between end-of-task and completion iterator read lock release This is a backport PR of #18076 to 2.1. ## What changes were proposed in this pull request? When a TaskContext is not propagated properly to all child threads for the task, just like the reported cases in this issue, we fail to get to TID from TaskContext and that causes unable to release the lock and assertion failures. To resolve this, we have to explicitly pass the TID value to the `unlock` method. ## How was this patch tested? Add new failing regression test case in `RDDSuite`. Author: Xingbo JiangCloses #18099 from jiangxb1987/completion-iterator-2.1. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c3302e81 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c3302e81 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c3302e81 Branch: refs/heads/branch-2.1 Commit: c3302e81e3e74744ec8da66b18f81c233a15e918 Parents: 2f68631 Author: Xingbo Jiang Authored: Thu May 25 08:31:04 2017 +0800 Committer: Wenchen Fan Committed: Thu May 25 08:31:04 2017 +0800 -- .../apache/spark/network/BlockDataManager.scala | 2 +- .../apache/spark/storage/BlockInfoManager.scala | 15 +- .../org/apache/spark/storage/BlockManager.scala | 21 +++- .../scala/org/apache/spark/rdd/RDDSuite.scala | 18 - 4 files changed, 44 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c3302e81/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala index 8f83668..b3f8bfe 100644 --- a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala +++ b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala @@ -46,5 +46,5 @@ trait BlockDataManager { /** * Release locks acquired by [[putBlockData()]] and [[getBlockData()]]. */ - def releaseLock(blockId: BlockId): Unit + def releaseLock(blockId: BlockId, taskAttemptId: Option[Long]): Unit } http://git-wip-us.apache.org/repos/asf/spark/blob/c3302e81/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala index dd8f5ba..c0e18e5 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala @@ -281,22 +281,27 @@ private[storage] class BlockInfoManager extends Logging { /** * Release a lock on the given block. + * In case a TaskContext is not propagated properly to all child threads for the task, we fail to + * get the TID from TaskContext, so we have to explicitly pass the TID value to release the lock. + * + * See SPARK-18406 for more discussion of this issue. */ - def unlock(blockId: BlockId): Unit = synchronized { -logTrace(s"Task $currentTaskAttemptId releasing lock for $blockId") + def unlock(blockId: BlockId, taskAttemptId: Option[TaskAttemptId] = None): Unit = synchronized { +val taskId = taskAttemptId.getOrElse(currentTaskAttemptId) +logTrace(s"Task $taskId releasing lock for $blockId") val info = get(blockId).getOrElse { throw new IllegalStateException(s"Block $blockId not found") } if (info.writerTask != BlockInfo.NO_WRITER) { info.writerTask = BlockInfo.NO_WRITER - writeLocksByTask.removeBinding(currentTaskAttemptId, blockId) + writeLocksByTask.removeBinding(taskId, blockId) } else { assert(info.readerCount > 0, s"Block $blockId is not locked for reading") info.readerCount -= 1 - val countsForTask = readLocksByTask(currentTaskAttemptId) + val countsForTask = readLocksByTask(taskId) val newPinCountForTask: Int = countsForTask.remove(blockId, 1) - 1 assert(newPinCountForTask >= 0, -s"Task $currentTaskAttemptId release lock on block $blockId more times than it acquired it") +s"Task $taskId release lock on block $blockId more times than it acquired it") } notifyAll() } http://git-wip-us.apache.org/repos/asf/spark/blob/c3302e81/core/src/main/scala/org/apache/spark/storage/BlockManager.scala -- diff --git
spark git commit: [SPARK-16202][SQL][DOC] Follow-up to Correct The Description of CreatableRelationProvider's createRelation
Repository: spark Updated Branches: refs/heads/branch-2.2 2405afce4 -> ae65d3014 [SPARK-16202][SQL][DOC] Follow-up to Correct The Description of CreatableRelationProvider's createRelation ## What changes were proposed in this pull request? Follow-up to SPARK-16202: 1. Remove the duplication of the meaning of `SaveMode` (as one was in fact missing that had proven that the duplication may be incomplete in the future again) 2. Use standard scaladoc tags /cc gatorsmile rxin yhuai (as they were involved previously) ## How was this patch tested? local build Author: Jacek LaskowskiCloses #18026 from jaceklaskowski/CreatableRelationProvider-SPARK-16202. (cherry picked from commit 5f8ff2fc9a859ceeaa8f1d03060fdbb30951e706) Signed-off-by: Xiao Li Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ae65d301 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ae65d301 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ae65d301 Branch: refs/heads/branch-2.2 Commit: ae65d3014941344a924da583959e6b4b1d1d64f2 Parents: 2405afc Author: Jacek Laskowski Authored: Wed May 24 17:24:23 2017 -0700 Committer: Xiao Li Committed: Wed May 24 17:24:33 2017 -0700 -- .../org/apache/spark/sql/sources/interfaces.scala | 17 +++-- 1 file changed, 7 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ae65d301/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index ff8b15b..86eeb2f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -163,16 +163,13 @@ trait StreamSinkProvider { @InterfaceStability.Stable trait CreatableRelationProvider { /** - * Save the DataFrame to the destination and return a relation with the given parameters based on - * the contents of the given DataFrame. The mode specifies the expected behavior of createRelation - * when data already exists. - * Right now, there are three modes, Append, Overwrite, and ErrorIfExists. - * Append mode means that when saving a DataFrame to a data source, if data already exists, - * contents of the DataFrame are expected to be appended to existing data. - * Overwrite mode means that when saving a DataFrame to a data source, if data already exists, - * existing data is expected to be overwritten by the contents of the DataFrame. - * ErrorIfExists mode means that when saving a DataFrame to a data source, - * if data already exists, an exception is expected to be thrown. + * Saves a DataFrame to a destination (using data source-specific parameters) + * + * @param sqlContext SQLContext + * @param mode specifies what happens when the destination already exists + * @param parameters data source-specific parameters + * @param data DataFrame to save (i.e. the rows after executing the query) + * @return Relation with a known schema * * @since 1.3.0 */ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16202][SQL][DOC] Follow-up to Correct The Description of CreatableRelationProvider's createRelation
Repository: spark Updated Branches: refs/heads/master c0b3e45e3 -> 5f8ff2fc9 [SPARK-16202][SQL][DOC] Follow-up to Correct The Description of CreatableRelationProvider's createRelation ## What changes were proposed in this pull request? Follow-up to SPARK-16202: 1. Remove the duplication of the meaning of `SaveMode` (as one was in fact missing that had proven that the duplication may be incomplete in the future again) 2. Use standard scaladoc tags /cc gatorsmile rxin yhuai (as they were involved previously) ## How was this patch tested? local build Author: Jacek LaskowskiCloses #18026 from jaceklaskowski/CreatableRelationProvider-SPARK-16202. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5f8ff2fc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5f8ff2fc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5f8ff2fc Branch: refs/heads/master Commit: 5f8ff2fc9a859ceeaa8f1d03060fdbb30951e706 Parents: c0b3e45 Author: Jacek Laskowski Authored: Wed May 24 17:24:23 2017 -0700 Committer: Xiao Li Committed: Wed May 24 17:24:23 2017 -0700 -- .../org/apache/spark/sql/sources/interfaces.scala | 17 +++-- 1 file changed, 7 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5f8ff2fc/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index ff8b15b..86eeb2f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -163,16 +163,13 @@ trait StreamSinkProvider { @InterfaceStability.Stable trait CreatableRelationProvider { /** - * Save the DataFrame to the destination and return a relation with the given parameters based on - * the contents of the given DataFrame. The mode specifies the expected behavior of createRelation - * when data already exists. - * Right now, there are three modes, Append, Overwrite, and ErrorIfExists. - * Append mode means that when saving a DataFrame to a data source, if data already exists, - * contents of the DataFrame are expected to be appended to existing data. - * Overwrite mode means that when saving a DataFrame to a data source, if data already exists, - * existing data is expected to be overwritten by the contents of the DataFrame. - * ErrorIfExists mode means that when saving a DataFrame to a data source, - * if data already exists, an exception is expected to be thrown. + * Saves a DataFrame to a destination (using data source-specific parameters) + * + * @param sqlContext SQLContext + * @param mode specifies what happens when the destination already exists + * @param parameters data source-specific parameters + * @param data DataFrame to save (i.e. the rows after executing the query) + * @return Relation with a known schema * * @since 1.3.0 */ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20872][SQL] ShuffleExchange.nodeName should handle null coordinator
Repository: spark Updated Branches: refs/heads/branch-2.2 b7a2a16b1 -> 2405afce4 [SPARK-20872][SQL] ShuffleExchange.nodeName should handle null coordinator ## What changes were proposed in this pull request? A one-liner change in `ShuffleExchange.nodeName` to cover the case when `coordinator` is `null`, so that the match expression is exhaustive. Please refer to [SPARK-20872](https://issues.apache.org/jira/browse/SPARK-20872) for a description of the symptoms. TL;DR is that inspecting a `ShuffleExchange` (directly or transitively) on the Executor side can hit a case where the `coordinator` field of a `ShuffleExchange` is null, and thus will trigger a `MatchError` in `ShuffleExchange.nodeName()`'s inexhaustive match expression. Also changed two other match conditions in `ShuffleExchange` on the `coordinator` field to be consistent. ## How was this patch tested? Manually tested this change with a case where the `coordinator` is null to make sure `ShuffleExchange.nodeName` doesn't throw a `MatchError` any more. Author: Kris MokCloses #18095 from rednaxelafx/shuffleexchange-nodename. (cherry picked from commit c0b3e45e3b46a5235b748cb85ad200c9ec1bb426) Signed-off-by: Xiao Li Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2405afce Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2405afce Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2405afce Branch: refs/heads/branch-2.2 Commit: 2405afce4e87c0486f2aef1d068f17aea2480b17 Parents: b7a2a16 Author: Kris Mok Authored: Wed May 24 17:19:35 2017 -0700 Committer: Xiao Li Committed: Wed May 24 17:19:46 2017 -0700 -- .../spark/sql/execution/exchange/ShuffleExchange.scala | 9 ++--- 1 file changed, 6 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2405afce/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala index f06544e..eebe6ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala @@ -40,6 +40,9 @@ case class ShuffleExchange( child: SparkPlan, @transient coordinator: Option[ExchangeCoordinator]) extends Exchange { + // NOTE: coordinator can be null after serialization/deserialization, + // e.g. it can be null on the Executor side + override lazy val metrics = Map( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size")) @@ -47,7 +50,7 @@ case class ShuffleExchange( val extraInfo = coordinator match { case Some(exchangeCoordinator) => s"(coordinator id: ${System.identityHashCode(exchangeCoordinator)})" - case None => "" + case _ => "" } val simpleNodeName = "Exchange" @@ -70,7 +73,7 @@ case class ShuffleExchange( // the plan. coordinator match { case Some(exchangeCoordinator) => exchangeCoordinator.registerExchange(this) - case None => + case _ => } } @@ -117,7 +120,7 @@ case class ShuffleExchange( val shuffleRDD = exchangeCoordinator.postShuffleRDD(this) assert(shuffleRDD.partitions.length == newPartitioning.numPartitions) shuffleRDD -case None => +case _ => val shuffleDependency = prepareShuffleDependency() preparePostShuffleRDD(shuffleDependency) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20872][SQL] ShuffleExchange.nodeName should handle null coordinator
Repository: spark Updated Branches: refs/heads/master 95aef660b -> c0b3e45e3 [SPARK-20872][SQL] ShuffleExchange.nodeName should handle null coordinator ## What changes were proposed in this pull request? A one-liner change in `ShuffleExchange.nodeName` to cover the case when `coordinator` is `null`, so that the match expression is exhaustive. Please refer to [SPARK-20872](https://issues.apache.org/jira/browse/SPARK-20872) for a description of the symptoms. TL;DR is that inspecting a `ShuffleExchange` (directly or transitively) on the Executor side can hit a case where the `coordinator` field of a `ShuffleExchange` is null, and thus will trigger a `MatchError` in `ShuffleExchange.nodeName()`'s inexhaustive match expression. Also changed two other match conditions in `ShuffleExchange` on the `coordinator` field to be consistent. ## How was this patch tested? Manually tested this change with a case where the `coordinator` is null to make sure `ShuffleExchange.nodeName` doesn't throw a `MatchError` any more. Author: Kris MokCloses #18095 from rednaxelafx/shuffleexchange-nodename. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c0b3e45e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c0b3e45e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c0b3e45e Branch: refs/heads/master Commit: c0b3e45e3b46a5235b748cb85ad200c9ec1bb426 Parents: 95aef66 Author: Kris Mok Authored: Wed May 24 17:19:35 2017 -0700 Committer: Xiao Li Committed: Wed May 24 17:19:35 2017 -0700 -- .../spark/sql/execution/exchange/ShuffleExchange.scala | 9 ++--- 1 file changed, 6 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c0b3e45e/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala index f06544e..eebe6ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala @@ -40,6 +40,9 @@ case class ShuffleExchange( child: SparkPlan, @transient coordinator: Option[ExchangeCoordinator]) extends Exchange { + // NOTE: coordinator can be null after serialization/deserialization, + // e.g. it can be null on the Executor side + override lazy val metrics = Map( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size")) @@ -47,7 +50,7 @@ case class ShuffleExchange( val extraInfo = coordinator match { case Some(exchangeCoordinator) => s"(coordinator id: ${System.identityHashCode(exchangeCoordinator)})" - case None => "" + case _ => "" } val simpleNodeName = "Exchange" @@ -70,7 +73,7 @@ case class ShuffleExchange( // the plan. coordinator match { case Some(exchangeCoordinator) => exchangeCoordinator.registerExchange(this) - case None => + case _ => } } @@ -117,7 +120,7 @@ case class ShuffleExchange( val shuffleRDD = exchangeCoordinator.postShuffleRDD(this) assert(shuffleRDD.partitions.length == newPartitioning.numPartitions) shuffleRDD -case None => +case _ => val shuffleDependency = prepareShuffleDependency() preparePostShuffleRDD(shuffleDependency) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20205][CORE] Make sure StageInfo is updated before sending event.
Repository: spark Updated Branches: refs/heads/master a64746677 -> 95aef660b [SPARK-20205][CORE] Make sure StageInfo is updated before sending event. The DAGScheduler was sending a "stage submitted" event before it properly updated the event's information. This meant that a listener (e.g. the even logging listener) could record wrong information about the event. This change sets the stage's submission time before the event is submitted, when there are tasks to be executed in the stage. Tested with existing unit tests. Author: Marcelo VanzinCloses #17925 from vanzin/SPARK-20205. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/95aef660 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/95aef660 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/95aef660 Branch: refs/heads/master Commit: 95aef660b73ec931e746d1ec8ae7848762ba0d7c Parents: a647466 Author: Marcelo Vanzin Authored: Wed May 24 16:57:17 2017 -0700 Committer: Marcelo Vanzin Committed: Wed May 24 16:57:17 2017 -0700 -- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 8 +++- 1 file changed, 7 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/95aef660/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 875acc3..ab2255f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -983,6 +983,13 @@ class DAGScheduler( } stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq) + +// If there are tasks to execute, record the submission time of the stage. Otherwise, +// post the even without the submission time, which indicates that this stage was +// skipped. +if (partitionsToCompute.nonEmpty) { + stage.latestInfo.submissionTime = Some(clock.getTimeMillis()) +} listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties)) // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times. @@ -1054,7 +1061,6 @@ class DAGScheduler( s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})") taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties)) - stage.latestInfo.submissionTime = Some(clock.getTimeMillis()) } else { // Because we posted SparkListenerStageSubmitted earlier, we should mark // the stage as completed here in case there are no tasks to run - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18406][CORE][BACKPORT-2.0] Race between end-of-task and completion iterator read lock release
Repository: spark Updated Branches: refs/heads/branch-2.0 72e1f83d7 -> 79fbfbbc7 [SPARK-18406][CORE][BACKPORT-2.0] Race between end-of-task and completion iterator read lock release This is a backport PR of #18076 to 2.0 and 2.1. ## What changes were proposed in this pull request? When a TaskContext is not propagated properly to all child threads for the task, just like the reported cases in this issue, we fail to get to TID from TaskContext and that causes unable to release the lock and assertion failures. To resolve this, we have to explicitly pass the TID value to the `unlock` method. ## How was this patch tested? Add new failing regression test case in `RDDSuite`. Author: Xingbo JiangCloses #18096 from jiangxb1987/completion-iterator-2.0. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/79fbfbbc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/79fbfbbc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/79fbfbbc Branch: refs/heads/branch-2.0 Commit: 79fbfbbc7ad7fea1ca4124981201e947db67745d Parents: 72e1f83 Author: Xingbo Jiang Authored: Wed May 24 14:34:17 2017 -0700 Committer: Xiao Li Committed: Wed May 24 14:34:17 2017 -0700 -- .../apache/spark/network/BlockDataManager.scala | 2 +- .../apache/spark/storage/BlockInfoManager.scala | 15 +- .../org/apache/spark/storage/BlockManager.scala | 21 +++- .../scala/org/apache/spark/rdd/RDDSuite.scala | 18 - 4 files changed, 44 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/79fbfbbc/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala index 8f83668..b3f8bfe 100644 --- a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala +++ b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala @@ -46,5 +46,5 @@ trait BlockDataManager { /** * Release locks acquired by [[putBlockData()]] and [[getBlockData()]]. */ - def releaseLock(blockId: BlockId): Unit + def releaseLock(blockId: BlockId, taskAttemptId: Option[Long]): Unit } http://git-wip-us.apache.org/repos/asf/spark/blob/79fbfbbc/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala index dd8f5ba..c0e18e5 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala @@ -281,22 +281,27 @@ private[storage] class BlockInfoManager extends Logging { /** * Release a lock on the given block. + * In case a TaskContext is not propagated properly to all child threads for the task, we fail to + * get the TID from TaskContext, so we have to explicitly pass the TID value to release the lock. + * + * See SPARK-18406 for more discussion of this issue. */ - def unlock(blockId: BlockId): Unit = synchronized { -logTrace(s"Task $currentTaskAttemptId releasing lock for $blockId") + def unlock(blockId: BlockId, taskAttemptId: Option[TaskAttemptId] = None): Unit = synchronized { +val taskId = taskAttemptId.getOrElse(currentTaskAttemptId) +logTrace(s"Task $taskId releasing lock for $blockId") val info = get(blockId).getOrElse { throw new IllegalStateException(s"Block $blockId not found") } if (info.writerTask != BlockInfo.NO_WRITER) { info.writerTask = BlockInfo.NO_WRITER - writeLocksByTask.removeBinding(currentTaskAttemptId, blockId) + writeLocksByTask.removeBinding(taskId, blockId) } else { assert(info.readerCount > 0, s"Block $blockId is not locked for reading") info.readerCount -= 1 - val countsForTask = readLocksByTask(currentTaskAttemptId) + val countsForTask = readLocksByTask(taskId) val newPinCountForTask: Int = countsForTask.remove(blockId, 1) - 1 assert(newPinCountForTask >= 0, -s"Task $currentTaskAttemptId release lock on block $blockId more times than it acquired it") +s"Task $taskId release lock on block $blockId more times than it acquired it") } notifyAll() } http://git-wip-us.apache.org/repos/asf/spark/blob/79fbfbbc/core/src/main/scala/org/apache/spark/storage/BlockManager.scala -- diff --git
spark git commit: [SPARK-20867][SQL] Move hints from Statistics into HintInfo class
Repository: spark Updated Branches: refs/heads/branch-2.2 c59ad420b -> b7a2a16b1 [SPARK-20867][SQL] Move hints from Statistics into HintInfo class ## What changes were proposed in this pull request? This is a follow-up to SPARK-20857 to move the broadcast hint from Statistics into a new HintInfo class, so we can be more flexible in adding new hints in the future. ## How was this patch tested? Updated test cases to reflect the change. Author: Reynold XinCloses #18087 from rxin/SPARK-20867. (cherry picked from commit a64746677bf09ef67e3fd538355a6ee9b5ce8cf4) Signed-off-by: Xiao Li Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b7a2a16b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b7a2a16b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b7a2a16b Branch: refs/heads/branch-2.2 Commit: b7a2a16b1e01375292938fc48b0a333ec4e7cd30 Parents: c59ad42 Author: Reynold Xin Authored: Wed May 24 13:57:19 2017 -0700 Committer: Xiao Li Committed: Wed May 24 13:57:28 2017 -0700 -- .../sql/catalyst/analysis/ResolveHints.scala| 6 ++--- .../catalyst/plans/logical/LogicalPlan.scala| 2 +- .../sql/catalyst/plans/logical/Statistics.scala | 11 +++-- .../plans/logical/basicLogicalOperators.scala | 17 +++--- .../sql/catalyst/plans/logical/hints.scala | 24 .../statsEstimation/AggregateEstimation.scala | 2 +- .../catalyst/analysis/ResolveHintsSuite.scala | 20 .../BasicStatsEstimationSuite.scala | 17 +++--- .../spark/sql/execution/SparkStrategies.scala | 2 +- .../scala/org/apache/spark/sql/functions.scala | 4 ++-- .../spark/sql/StatisticsCollectionSuite.scala | 2 +- 11 files changed, 59 insertions(+), 48 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b7a2a16b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala index 9dfd84c..86c788a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala @@ -57,9 +57,9 @@ object ResolveHints { val newNode = CurrentOrigin.withOrigin(plan.origin) { plan match { case u: UnresolvedRelation if toBroadcast.exists(resolver(_, u.tableIdentifier.table)) => -ResolvedHint(plan, isBroadcastable = Option(true)) +ResolvedHint(plan, HintInfo(isBroadcastable = Option(true))) case r: SubqueryAlias if toBroadcast.exists(resolver(_, r.alias)) => -ResolvedHint(plan, isBroadcastable = Option(true)) +ResolvedHint(plan, HintInfo(isBroadcastable = Option(true))) case _: ResolvedHint | _: View | _: With | _: SubqueryAlias => // Don't traverse down these nodes. @@ -88,7 +88,7 @@ object ResolveHints { case h: UnresolvedHint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) => if (h.parameters.isEmpty) { // If there is no table alias specified, turn the entire subtree into a BroadcastHint. - ResolvedHint(h.child, isBroadcastable = Option(true)) + ResolvedHint(h.child, HintInfo(isBroadcastable = Option(true))) } else { // Otherwise, find within the subtree query plans that should be broadcasted. applyBroadcastHint(h.child, h.parameters.toSet) http://git-wip-us.apache.org/repos/asf/spark/blob/b7a2a16b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 6bdcf49..2ebb2ff 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -347,7 +347,7 @@ abstract class UnaryNode extends LogicalPlan { } // Don't propagate rowCount and attributeStats, since they are not estimated here. -Statistics(sizeInBytes = sizeInBytes, isBroadcastable = child.stats(conf).isBroadcastable) +Statistics(sizeInBytes = sizeInBytes, hints = child.stats(conf).hints) } }
spark git commit: [SPARK-20867][SQL] Move hints from Statistics into HintInfo class
Repository: spark Updated Branches: refs/heads/master f72ad303f -> a64746677 [SPARK-20867][SQL] Move hints from Statistics into HintInfo class ## What changes were proposed in this pull request? This is a follow-up to SPARK-20857 to move the broadcast hint from Statistics into a new HintInfo class, so we can be more flexible in adding new hints in the future. ## How was this patch tested? Updated test cases to reflect the change. Author: Reynold XinCloses #18087 from rxin/SPARK-20867. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6474667 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6474667 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6474667 Branch: refs/heads/master Commit: a64746677bf09ef67e3fd538355a6ee9b5ce8cf4 Parents: f72ad30 Author: Reynold Xin Authored: Wed May 24 13:57:19 2017 -0700 Committer: Xiao Li Committed: Wed May 24 13:57:19 2017 -0700 -- .../sql/catalyst/analysis/ResolveHints.scala| 6 ++--- .../catalyst/plans/logical/LogicalPlan.scala| 2 +- .../sql/catalyst/plans/logical/Statistics.scala | 11 +++-- .../plans/logical/basicLogicalOperators.scala | 17 +++--- .../sql/catalyst/plans/logical/hints.scala | 24 .../statsEstimation/AggregateEstimation.scala | 2 +- .../catalyst/analysis/ResolveHintsSuite.scala | 20 .../BasicStatsEstimationSuite.scala | 17 +++--- .../spark/sql/execution/SparkStrategies.scala | 2 +- .../scala/org/apache/spark/sql/functions.scala | 4 ++-- .../spark/sql/StatisticsCollectionSuite.scala | 2 +- 11 files changed, 59 insertions(+), 48 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a6474667/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala index 9dfd84c..86c788a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala @@ -57,9 +57,9 @@ object ResolveHints { val newNode = CurrentOrigin.withOrigin(plan.origin) { plan match { case u: UnresolvedRelation if toBroadcast.exists(resolver(_, u.tableIdentifier.table)) => -ResolvedHint(plan, isBroadcastable = Option(true)) +ResolvedHint(plan, HintInfo(isBroadcastable = Option(true))) case r: SubqueryAlias if toBroadcast.exists(resolver(_, r.alias)) => -ResolvedHint(plan, isBroadcastable = Option(true)) +ResolvedHint(plan, HintInfo(isBroadcastable = Option(true))) case _: ResolvedHint | _: View | _: With | _: SubqueryAlias => // Don't traverse down these nodes. @@ -88,7 +88,7 @@ object ResolveHints { case h: UnresolvedHint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) => if (h.parameters.isEmpty) { // If there is no table alias specified, turn the entire subtree into a BroadcastHint. - ResolvedHint(h.child, isBroadcastable = Option(true)) + ResolvedHint(h.child, HintInfo(isBroadcastable = Option(true))) } else { // Otherwise, find within the subtree query plans that should be broadcasted. applyBroadcastHint(h.child, h.parameters.toSet) http://git-wip-us.apache.org/repos/asf/spark/blob/a6474667/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 6bdcf49..2ebb2ff 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -347,7 +347,7 @@ abstract class UnaryNode extends LogicalPlan { } // Don't propagate rowCount and attributeStats, since they are not estimated here. -Statistics(sizeInBytes = sizeInBytes, isBroadcastable = child.stats(conf).isBroadcastable) +Statistics(sizeInBytes = sizeInBytes, hints = child.stats(conf).hints) } } http://git-wip-us.apache.org/repos/asf/spark/blob/a6474667/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
spark git commit: [SPARK-20848][SQL] Shutdown the pool after reading parquet files
Repository: spark Updated Branches: refs/heads/branch-2.1 13adc0fc0 -> 2f68631f5 [SPARK-20848][SQL] Shutdown the pool after reading parquet files ## What changes were proposed in this pull request? >From JIRA: On each call to spark.read.parquet, a new ForkJoinPool is created. >One of the threads in the pool is kept in the WAITING state, and never >stopped, which leads to unbounded growth in number of threads. We should shutdown the pool after reading parquet files. ## How was this patch tested? Added a test to ParquetFileFormatSuite. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi HsiehCloses #18073 from viirya/SPARK-20848. (cherry picked from commit f72ad303f05a6d99513ea3b121375726b177199c) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2f68631f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2f68631f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2f68631f Branch: refs/heads/branch-2.1 Commit: 2f68631f523e4db08549497d9c3264a43137bbb1 Parents: 13adc0f Author: Liang-Chi Hsieh Authored: Thu May 25 00:35:40 2017 +0800 Committer: Wenchen Fan Committed: Thu May 25 00:36:22 2017 +0800 -- .../sql/execution/datasources/parquet/ParquetFileFormat.scala | 5 - 1 file changed, 4 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2f68631f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 2b4892e..70eb01c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -496,7 +496,8 @@ object ParquetFileFormat extends Logging { partFiles: Seq[FileStatus], ignoreCorruptFiles: Boolean): Seq[Footer] = { val parFiles = partFiles.par -parFiles.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(8)) +val pool = new ForkJoinPool(8) +parFiles.tasksupport = new ForkJoinTaskSupport(pool) parFiles.flatMap { currentFile => try { // Skips row group information since we only need the schema. @@ -512,6 +513,8 @@ object ParquetFileFormat extends Logging { } else { throw new IOException(s"Could not read footer for file: $currentFile", e) } + } finally { +pool.shutdown() } }.seq } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20848][SQL] Shutdown the pool after reading parquet files
Repository: spark Updated Branches: refs/heads/master bc66a77bb -> f72ad303f [SPARK-20848][SQL] Shutdown the pool after reading parquet files ## What changes were proposed in this pull request? >From JIRA: On each call to spark.read.parquet, a new ForkJoinPool is created. >One of the threads in the pool is kept in the WAITING state, and never >stopped, which leads to unbounded growth in number of threads. We should shutdown the pool after reading parquet files. ## How was this patch tested? Added a test to ParquetFileFormatSuite. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi HsiehCloses #18073 from viirya/SPARK-20848. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f72ad303 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f72ad303 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f72ad303 Branch: refs/heads/master Commit: f72ad303f05a6d99513ea3b121375726b177199c Parents: bc66a77 Author: Liang-Chi Hsieh Authored: Thu May 25 00:35:40 2017 +0800 Committer: Wenchen Fan Committed: Thu May 25 00:35:40 2017 +0800 -- .../sql/execution/datasources/parquet/ParquetFileFormat.scala | 5 - 1 file changed, 4 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f72ad303/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 2f3a2c6..29ed890 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -479,7 +479,8 @@ object ParquetFileFormat extends Logging { partFiles: Seq[FileStatus], ignoreCorruptFiles: Boolean): Seq[Footer] = { val parFiles = partFiles.par -parFiles.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(8)) +val pool = new ForkJoinPool(8) +parFiles.tasksupport = new ForkJoinTaskSupport(pool) parFiles.flatMap { currentFile => try { // Skips row group information since we only need the schema. @@ -495,6 +496,8 @@ object ParquetFileFormat extends Logging { } else { throw new IOException(s"Could not read footer for file: $currentFile", e) } + } finally { +pool.shutdown() } }.seq } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20862][MLLIB][PYTHON] Avoid passing float to ndarray.reshape in LogisticRegressionModel
Repository: spark Updated Branches: refs/heads/branch-2.0 4dd34d004 -> 72e1f83d7 [SPARK-20862][MLLIB][PYTHON] Avoid passing float to ndarray.reshape in LogisticRegressionModel ## What changes were proposed in this pull request? Fixed TypeError with python3 and numpy 1.12.1. Numpy's `reshape` no longer takes floats as arguments as of 1.12. Also, python3 uses float division for `/`, we should be using `//` to ensure that `_dataWithBiasSize` doesn't get set to a float. ## How was this patch tested? Existing tests run using python3 and numpy 1.12. Author: Bago AmirbekianCloses #18081 from MrBago/BF-py3floatbug. (cherry picked from commit bc66a77bbe2120cc21bd8da25194efca4cde13c3) Signed-off-by: Yanbo Liang Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/72e1f83d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/72e1f83d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/72e1f83d Branch: refs/heads/branch-2.0 Commit: 72e1f83d78e51b53c104d1cd101c10bbe557c047 Parents: 4dd34d0 Author: Bago Amirbekian Authored: Wed May 24 22:55:38 2017 +0800 Committer: Yanbo Liang Committed: Wed May 24 23:00:01 2017 +0800 -- python/pyspark/mllib/classification.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/72e1f83d/python/pyspark/mllib/classification.py -- diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py index 9f53ed0..e04eeb2 100644 --- a/python/pyspark/mllib/classification.py +++ b/python/pyspark/mllib/classification.py @@ -171,7 +171,7 @@ class LogisticRegressionModel(LinearClassificationModel): self._dataWithBiasSize = None self._weightsMatrix = None else: -self._dataWithBiasSize = self._coeff.size / (self._numClasses - 1) +self._dataWithBiasSize = self._coeff.size // (self._numClasses - 1) self._weightsMatrix = self._coeff.toArray().reshape(self._numClasses - 1, self._dataWithBiasSize) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20862][MLLIB][PYTHON] Avoid passing float to ndarray.reshape in LogisticRegressionModel
Repository: spark Updated Branches: refs/heads/branch-2.1 f4538c95f -> 13adc0fc0 [SPARK-20862][MLLIB][PYTHON] Avoid passing float to ndarray.reshape in LogisticRegressionModel ## What changes were proposed in this pull request? Fixed TypeError with python3 and numpy 1.12.1. Numpy's `reshape` no longer takes floats as arguments as of 1.12. Also, python3 uses float division for `/`, we should be using `//` to ensure that `_dataWithBiasSize` doesn't get set to a float. ## How was this patch tested? Existing tests run using python3 and numpy 1.12. Author: Bago AmirbekianCloses #18081 from MrBago/BF-py3floatbug. (cherry picked from commit bc66a77bbe2120cc21bd8da25194efca4cde13c3) Signed-off-by: Yanbo Liang Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/13adc0fc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/13adc0fc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/13adc0fc Branch: refs/heads/branch-2.1 Commit: 13adc0fc0e940a4ea8b703241666440357a597e3 Parents: f4538c9 Author: Bago Amirbekian Authored: Wed May 24 22:55:38 2017 +0800 Committer: Yanbo Liang Committed: Wed May 24 22:58:16 2017 +0800 -- python/pyspark/mllib/classification.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/13adc0fc/python/pyspark/mllib/classification.py -- diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py index 9f53ed0..e04eeb2 100644 --- a/python/pyspark/mllib/classification.py +++ b/python/pyspark/mllib/classification.py @@ -171,7 +171,7 @@ class LogisticRegressionModel(LinearClassificationModel): self._dataWithBiasSize = None self._weightsMatrix = None else: -self._dataWithBiasSize = self._coeff.size / (self._numClasses - 1) +self._dataWithBiasSize = self._coeff.size // (self._numClasses - 1) self._weightsMatrix = self._coeff.toArray().reshape(self._numClasses - 1, self._dataWithBiasSize) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20862][MLLIB][PYTHON] Avoid passing float to ndarray.reshape in LogisticRegressionModel
Repository: spark Updated Branches: refs/heads/branch-2.2 1d107242f -> 83aeac9e0 [SPARK-20862][MLLIB][PYTHON] Avoid passing float to ndarray.reshape in LogisticRegressionModel ## What changes were proposed in this pull request? Fixed TypeError with python3 and numpy 1.12.1. Numpy's `reshape` no longer takes floats as arguments as of 1.12. Also, python3 uses float division for `/`, we should be using `//` to ensure that `_dataWithBiasSize` doesn't get set to a float. ## How was this patch tested? Existing tests run using python3 and numpy 1.12. Author: Bago AmirbekianCloses #18081 from MrBago/BF-py3floatbug. (cherry picked from commit bc66a77bbe2120cc21bd8da25194efca4cde13c3) Signed-off-by: Yanbo Liang Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/83aeac9e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/83aeac9e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/83aeac9e Branch: refs/heads/branch-2.2 Commit: 83aeac9e0590e99010d0af8e067822d0ed0971fe Parents: 1d10724 Author: Bago Amirbekian Authored: Wed May 24 22:55:38 2017 +0800 Committer: Yanbo Liang Committed: Wed May 24 22:56:28 2017 +0800 -- python/pyspark/mllib/classification.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/83aeac9e/python/pyspark/mllib/classification.py -- diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py index 9f53ed0..e04eeb2 100644 --- a/python/pyspark/mllib/classification.py +++ b/python/pyspark/mllib/classification.py @@ -171,7 +171,7 @@ class LogisticRegressionModel(LinearClassificationModel): self._dataWithBiasSize = None self._weightsMatrix = None else: -self._dataWithBiasSize = self._coeff.size / (self._numClasses - 1) +self._dataWithBiasSize = self._coeff.size // (self._numClasses - 1) self._weightsMatrix = self._coeff.toArray().reshape(self._numClasses - 1, self._dataWithBiasSize) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20862][MLLIB][PYTHON] Avoid passing float to ndarray.reshape in LogisticRegressionModel
Repository: spark Updated Branches: refs/heads/master 1816eb3be -> bc66a77bb [SPARK-20862][MLLIB][PYTHON] Avoid passing float to ndarray.reshape in LogisticRegressionModel ## What changes were proposed in this pull request? Fixed TypeError with python3 and numpy 1.12.1. Numpy's `reshape` no longer takes floats as arguments as of 1.12. Also, python3 uses float division for `/`, we should be using `//` to ensure that `_dataWithBiasSize` doesn't get set to a float. ## How was this patch tested? Existing tests run using python3 and numpy 1.12. Author: Bago AmirbekianCloses #18081 from MrBago/BF-py3floatbug. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bc66a77b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bc66a77b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bc66a77b Branch: refs/heads/master Commit: bc66a77bbe2120cc21bd8da25194efca4cde13c3 Parents: 1816eb3 Author: Bago Amirbekian Authored: Wed May 24 22:55:38 2017 +0800 Committer: Yanbo Liang Committed: Wed May 24 22:55:38 2017 +0800 -- python/pyspark/mllib/classification.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bc66a77b/python/pyspark/mllib/classification.py -- diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py index 9f53ed0..e04eeb2 100644 --- a/python/pyspark/mllib/classification.py +++ b/python/pyspark/mllib/classification.py @@ -171,7 +171,7 @@ class LogisticRegressionModel(LinearClassificationModel): self._dataWithBiasSize = None self._weightsMatrix = None else: -self._dataWithBiasSize = self._coeff.size / (self._numClasses - 1) +self._dataWithBiasSize = self._coeff.size // (self._numClasses - 1) self._weightsMatrix = self._coeff.toArray().reshape(self._numClasses - 1, self._dataWithBiasSize) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20631][FOLLOW-UP] Fix incorrect tests.
Repository: spark Updated Branches: refs/heads/branch-2.2 e936a96ba -> 1d107242f [SPARK-20631][FOLLOW-UP] Fix incorrect tests. ## What changes were proposed in this pull request? - Fix incorrect tests for `_check_thresholds`. - Move test to `ParamTests`. ## How was this patch tested? Unit tests. Author: zero323Closes #18085 from zero323/SPARK-20631-FOLLOW-UP. (cherry picked from commit 1816eb3bef930407dc9e083de08f5105725c55d1) Signed-off-by: Yanbo Liang Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1d107242 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1d107242 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1d107242 Branch: refs/heads/branch-2.2 Commit: 1d107242f8ec842c009e0b427f6e4a8313d99aa2 Parents: e936a96 Author: zero323 Authored: Wed May 24 19:57:44 2017 +0800 Committer: Yanbo Liang Committed: Wed May 24 19:58:40 2017 +0800 -- python/pyspark/ml/tests.py | 24 1 file changed, 12 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1d107242/python/pyspark/ml/tests.py -- diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index a3393c6..0daf29d 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -404,6 +404,18 @@ class ParamTests(PySparkTestCase): self.assertEqual(tp._paramMap, copied_no_extra) self.assertEqual(tp._defaultParamMap, tp_copy._defaultParamMap) +def test_logistic_regression_check_thresholds(self): +self.assertIsInstance( +LogisticRegression(threshold=0.5, thresholds=[0.5, 0.5]), +LogisticRegression +) + +self.assertRaisesRegexp( +ValueError, +"Logistic Regression getThreshold found inconsistent.*$", +LogisticRegression, threshold=0.42, thresholds=[0.5, 0.5] +) + class EvaluatorTests(SparkSessionTestCase): @@ -807,18 +819,6 @@ class PersistenceTest(SparkSessionTestCase): except OSError: pass -def logistic_regression_check_thresholds(self): -self.assertIsInstance( -LogisticRegression(threshold=0.5, thresholds=[0.5, 0.5]), -LogisticRegressionModel -) - -self.assertRaisesRegexp( -ValueError, -"Logistic Regression getThreshold found inconsistent.*$", -LogisticRegression, threshold=0.42, thresholds=[0.5, 0.5] -) - def _compare_params(self, m1, m2, param): """ Compare 2 ML Params instances for the given param, and assert both have the same param value - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20631][FOLLOW-UP] Fix incorrect tests.
Repository: spark Updated Branches: refs/heads/master 9afcf127d -> 1816eb3be [SPARK-20631][FOLLOW-UP] Fix incorrect tests. ## What changes were proposed in this pull request? - Fix incorrect tests for `_check_thresholds`. - Move test to `ParamTests`. ## How was this patch tested? Unit tests. Author: zero323Closes #18085 from zero323/SPARK-20631-FOLLOW-UP. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1816eb3b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1816eb3b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1816eb3b Branch: refs/heads/master Commit: 1816eb3bef930407dc9e083de08f5105725c55d1 Parents: 9afcf12 Author: zero323 Authored: Wed May 24 19:57:44 2017 +0800 Committer: Yanbo Liang Committed: Wed May 24 19:57:44 2017 +0800 -- python/pyspark/ml/tests.py | 24 1 file changed, 12 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1816eb3b/python/pyspark/ml/tests.py -- diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index a3393c6..0daf29d 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -404,6 +404,18 @@ class ParamTests(PySparkTestCase): self.assertEqual(tp._paramMap, copied_no_extra) self.assertEqual(tp._defaultParamMap, tp_copy._defaultParamMap) +def test_logistic_regression_check_thresholds(self): +self.assertIsInstance( +LogisticRegression(threshold=0.5, thresholds=[0.5, 0.5]), +LogisticRegression +) + +self.assertRaisesRegexp( +ValueError, +"Logistic Regression getThreshold found inconsistent.*$", +LogisticRegression, threshold=0.42, thresholds=[0.5, 0.5] +) + class EvaluatorTests(SparkSessionTestCase): @@ -807,18 +819,6 @@ class PersistenceTest(SparkSessionTestCase): except OSError: pass -def logistic_regression_check_thresholds(self): -self.assertIsInstance( -LogisticRegression(threshold=0.5, thresholds=[0.5, 0.5]), -LogisticRegressionModel -) - -self.assertRaisesRegexp( -ValueError, -"Logistic Regression getThreshold found inconsistent.*$", -LogisticRegression, threshold=0.42, thresholds=[0.5, 0.5] -) - def _compare_params(self, m1, m2, param): """ Compare 2 ML Params instances for the given param, and assert both have the same param value - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20764][ML][PYSPARK][FOLLOWUP] Fix visibility discrepancy with numInstances and degreesOfFreedom in LR and GLR - Python version
Repository: spark Updated Branches: refs/heads/branch-2.2 ee9d5975e -> e936a96ba [SPARK-20764][ML][PYSPARK][FOLLOWUP] Fix visibility discrepancy with numInstances and degreesOfFreedom in LR and GLR - Python version ## What changes were proposed in this pull request? Add test cases for PR-18062 ## How was this patch tested? The existing UT Author: PengCloses #18068 from mpjlu/moreTest. (cherry picked from commit 9afcf127d31b5477a539dde6e5f01861532a1c4c) Signed-off-by: Yanbo Liang Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e936a96b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e936a96b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e936a96b Branch: refs/heads/branch-2.2 Commit: e936a96badfeeb2051ee35dc4b0fbecefa9bf4cb Parents: ee9d597 Author: Peng Authored: Wed May 24 19:54:17 2017 +0800 Committer: Yanbo Liang Committed: Wed May 24 19:54:58 2017 +0800 -- python/pyspark/ml/tests.py | 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e936a96b/python/pyspark/ml/tests.py -- diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 51a3e8e..a3393c6 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -1066,6 +1066,7 @@ class TrainingSummaryTest(SparkSessionTestCase): self.assertAlmostEqual(s.r2, 1.0, 2) self.assertTrue(isinstance(s.residuals, DataFrame)) self.assertEqual(s.numInstances, 2) +self.assertEqual(s.degreesOfFreedom, 1) devResiduals = s.devianceResiduals self.assertTrue(isinstance(devResiduals, list) and isinstance(devResiduals[0], float)) coefStdErr = s.coefficientStandardErrors @@ -1075,7 +1076,8 @@ class TrainingSummaryTest(SparkSessionTestCase): pValues = s.pValues self.assertTrue(isinstance(pValues, list) and isinstance(pValues[0], float)) # test evaluation (with training dataset) produces a summary with same values -# one check is enough to verify a summary is returned, Scala version runs full test +# one check is enough to verify a summary is returned +# The child class LinearRegressionTrainingSummary runs full test sameSummary = model.evaluate(df) self.assertAlmostEqual(sameSummary.explainedVariance, s.explainedVariance) @@ -1093,6 +1095,7 @@ class TrainingSummaryTest(SparkSessionTestCase): self.assertEqual(s.numIterations, 1) # this should default to a single iteration of WLS self.assertTrue(isinstance(s.predictions, DataFrame)) self.assertEqual(s.predictionCol, "prediction") +self.assertEqual(s.numInstances, 2) self.assertTrue(isinstance(s.residuals(), DataFrame)) self.assertTrue(isinstance(s.residuals("pearson"), DataFrame)) coefStdErr = s.coefficientStandardErrors @@ -,7 +1114,8 @@ class TrainingSummaryTest(SparkSessionTestCase): self.assertTrue(isinstance(s.nullDeviance, float)) self.assertTrue(isinstance(s.dispersion, float)) # test evaluation (with training dataset) produces a summary with same values -# one check is enough to verify a summary is returned, Scala version runs full test +# one check is enough to verify a summary is returned +# The child class GeneralizedLinearRegressionTrainingSummary runs full test sameSummary = model.evaluate(df) self.assertAlmostEqual(sameSummary.deviance, s.deviance) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20764][ML][PYSPARK][FOLLOWUP] Fix visibility discrepancy with numInstances and degreesOfFreedom in LR and GLR - Python version
Repository: spark Updated Branches: refs/heads/master d76633e3c -> 9afcf127d [SPARK-20764][ML][PYSPARK][FOLLOWUP] Fix visibility discrepancy with numInstances and degreesOfFreedom in LR and GLR - Python version ## What changes were proposed in this pull request? Add test cases for PR-18062 ## How was this patch tested? The existing UT Author: PengCloses #18068 from mpjlu/moreTest. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9afcf127 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9afcf127 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9afcf127 Branch: refs/heads/master Commit: 9afcf127d31b5477a539dde6e5f01861532a1c4c Parents: d76633e Author: Peng Authored: Wed May 24 19:54:17 2017 +0800 Committer: Yanbo Liang Committed: Wed May 24 19:54:17 2017 +0800 -- python/pyspark/ml/tests.py | 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9afcf127/python/pyspark/ml/tests.py -- diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 51a3e8e..a3393c6 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -1066,6 +1066,7 @@ class TrainingSummaryTest(SparkSessionTestCase): self.assertAlmostEqual(s.r2, 1.0, 2) self.assertTrue(isinstance(s.residuals, DataFrame)) self.assertEqual(s.numInstances, 2) +self.assertEqual(s.degreesOfFreedom, 1) devResiduals = s.devianceResiduals self.assertTrue(isinstance(devResiduals, list) and isinstance(devResiduals[0], float)) coefStdErr = s.coefficientStandardErrors @@ -1075,7 +1076,8 @@ class TrainingSummaryTest(SparkSessionTestCase): pValues = s.pValues self.assertTrue(isinstance(pValues, list) and isinstance(pValues[0], float)) # test evaluation (with training dataset) produces a summary with same values -# one check is enough to verify a summary is returned, Scala version runs full test +# one check is enough to verify a summary is returned +# The child class LinearRegressionTrainingSummary runs full test sameSummary = model.evaluate(df) self.assertAlmostEqual(sameSummary.explainedVariance, s.explainedVariance) @@ -1093,6 +1095,7 @@ class TrainingSummaryTest(SparkSessionTestCase): self.assertEqual(s.numIterations, 1) # this should default to a single iteration of WLS self.assertTrue(isinstance(s.predictions, DataFrame)) self.assertEqual(s.predictionCol, "prediction") +self.assertEqual(s.numInstances, 2) self.assertTrue(isinstance(s.residuals(), DataFrame)) self.assertTrue(isinstance(s.residuals("pearson"), DataFrame)) coefStdErr = s.coefficientStandardErrors @@ -,7 +1114,8 @@ class TrainingSummaryTest(SparkSessionTestCase): self.assertTrue(isinstance(s.nullDeviance, float)) self.assertTrue(isinstance(s.dispersion, float)) # test evaluation (with training dataset) produces a summary with same values -# one check is enough to verify a summary is returned, Scala version runs full test +# one check is enough to verify a summary is returned +# The child class GeneralizedLinearRegressionTrainingSummary runs full test sameSummary = model.evaluate(df) self.assertAlmostEqual(sameSummary.deviance, s.deviance) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18406][CORE] Race between end-of-task and completion iterator read lock release
Repository: spark Updated Branches: refs/heads/master 9434280cf -> d76633e3c [SPARK-18406][CORE] Race between end-of-task and completion iterator read lock release ## What changes were proposed in this pull request? When a TaskContext is not propagated properly to all child threads for the task, just like the reported cases in this issue, we fail to get to TID from TaskContext and that causes unable to release the lock and assertion failures. To resolve this, we have to explicitly pass the TID value to the `unlock` method. ## How was this patch tested? Add new failing regression test case in `RDDSuite`. Author: Xingbo JiangCloses #18076 from jiangxb1987/completion-iterator. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d76633e3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d76633e3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d76633e3 Branch: refs/heads/master Commit: d76633e3cad341b9efa23629f33c5ce90993d6d4 Parents: 9434280 Author: Xingbo Jiang Authored: Wed May 24 15:43:23 2017 +0800 Committer: Wenchen Fan Committed: Wed May 24 15:43:23 2017 +0800 -- .../apache/spark/network/BlockDataManager.scala | 2 +- .../apache/spark/storage/BlockInfoManager.scala | 15 ++ .../org/apache/spark/storage/BlockManager.scala | 29 ++-- .../scala/org/apache/spark/rdd/RDDSuite.scala | 18 +++- 4 files changed, 49 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d76633e3/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala index 8f83668..b3f8bfe 100644 --- a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala +++ b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala @@ -46,5 +46,5 @@ trait BlockDataManager { /** * Release locks acquired by [[putBlockData()]] and [[getBlockData()]]. */ - def releaseLock(blockId: BlockId): Unit + def releaseLock(blockId: BlockId, taskAttemptId: Option[Long]): Unit } http://git-wip-us.apache.org/repos/asf/spark/blob/d76633e3/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala index 3db5983..7064872 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala @@ -281,22 +281,27 @@ private[storage] class BlockInfoManager extends Logging { /** * Release a lock on the given block. + * In case a TaskContext is not propagated properly to all child threads for the task, we fail to + * get the TID from TaskContext, so we have to explicitly pass the TID value to release the lock. + * + * See SPARK-18406 for more discussion of this issue. */ - def unlock(blockId: BlockId): Unit = synchronized { -logTrace(s"Task $currentTaskAttemptId releasing lock for $blockId") + def unlock(blockId: BlockId, taskAttemptId: Option[TaskAttemptId] = None): Unit = synchronized { +val taskId = taskAttemptId.getOrElse(currentTaskAttemptId) +logTrace(s"Task $taskId releasing lock for $blockId") val info = get(blockId).getOrElse { throw new IllegalStateException(s"Block $blockId not found") } if (info.writerTask != BlockInfo.NO_WRITER) { info.writerTask = BlockInfo.NO_WRITER - writeLocksByTask.removeBinding(currentTaskAttemptId, blockId) + writeLocksByTask.removeBinding(taskId, blockId) } else { assert(info.readerCount > 0, s"Block $blockId is not locked for reading") info.readerCount -= 1 - val countsForTask = readLocksByTask(currentTaskAttemptId) + val countsForTask = readLocksByTask(taskId) val newPinCountForTask: Int = countsForTask.remove(blockId, 1) - 1 assert(newPinCountForTask >= 0, -s"Task $currentTaskAttemptId release lock on block $blockId more times than it acquired it") +s"Task $taskId release lock on block $blockId more times than it acquired it") } notifyAll() } http://git-wip-us.apache.org/repos/asf/spark/blob/d76633e3/core/src/main/scala/org/apache/spark/storage/BlockManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
spark git commit: [SPARK-18406][CORE] Race between end-of-task and completion iterator read lock release
Repository: spark Updated Branches: refs/heads/branch-2.2 00dee3902 -> ee9d5975e [SPARK-18406][CORE] Race between end-of-task and completion iterator read lock release ## What changes were proposed in this pull request? When a TaskContext is not propagated properly to all child threads for the task, just like the reported cases in this issue, we fail to get to TID from TaskContext and that causes unable to release the lock and assertion failures. To resolve this, we have to explicitly pass the TID value to the `unlock` method. ## How was this patch tested? Add new failing regression test case in `RDDSuite`. Author: Xingbo JiangCloses #18076 from jiangxb1987/completion-iterator. (cherry picked from commit d76633e3cad341b9efa23629f33c5ce90993d6d4) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ee9d5975 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ee9d5975 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ee9d5975 Branch: refs/heads/branch-2.2 Commit: ee9d5975e6dbc5cb1dfe498870f94b1d760098db Parents: 00dee39 Author: Xingbo Jiang Authored: Wed May 24 15:43:23 2017 +0800 Committer: Wenchen Fan Committed: Wed May 24 15:43:38 2017 +0800 -- .../apache/spark/network/BlockDataManager.scala | 2 +- .../apache/spark/storage/BlockInfoManager.scala | 15 ++ .../org/apache/spark/storage/BlockManager.scala | 29 ++-- .../scala/org/apache/spark/rdd/RDDSuite.scala | 18 +++- 4 files changed, 49 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ee9d5975/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala index 8f83668..b3f8bfe 100644 --- a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala +++ b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala @@ -46,5 +46,5 @@ trait BlockDataManager { /** * Release locks acquired by [[putBlockData()]] and [[getBlockData()]]. */ - def releaseLock(blockId: BlockId): Unit + def releaseLock(blockId: BlockId, taskAttemptId: Option[Long]): Unit } http://git-wip-us.apache.org/repos/asf/spark/blob/ee9d5975/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala index 3db5983..7064872 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala @@ -281,22 +281,27 @@ private[storage] class BlockInfoManager extends Logging { /** * Release a lock on the given block. + * In case a TaskContext is not propagated properly to all child threads for the task, we fail to + * get the TID from TaskContext, so we have to explicitly pass the TID value to release the lock. + * + * See SPARK-18406 for more discussion of this issue. */ - def unlock(blockId: BlockId): Unit = synchronized { -logTrace(s"Task $currentTaskAttemptId releasing lock for $blockId") + def unlock(blockId: BlockId, taskAttemptId: Option[TaskAttemptId] = None): Unit = synchronized { +val taskId = taskAttemptId.getOrElse(currentTaskAttemptId) +logTrace(s"Task $taskId releasing lock for $blockId") val info = get(blockId).getOrElse { throw new IllegalStateException(s"Block $blockId not found") } if (info.writerTask != BlockInfo.NO_WRITER) { info.writerTask = BlockInfo.NO_WRITER - writeLocksByTask.removeBinding(currentTaskAttemptId, blockId) + writeLocksByTask.removeBinding(taskId, blockId) } else { assert(info.readerCount > 0, s"Block $blockId is not locked for reading") info.readerCount -= 1 - val countsForTask = readLocksByTask(currentTaskAttemptId) + val countsForTask = readLocksByTask(taskId) val newPinCountForTask: Int = countsForTask.remove(blockId, 1) - 1 assert(newPinCountForTask >= 0, -s"Task $currentTaskAttemptId release lock on block $blockId more times than it acquired it") +s"Task $taskId release lock on block $blockId more times than it acquired it") } notifyAll() } http://git-wip-us.apache.org/repos/asf/spark/blob/ee9d5975/core/src/main/scala/org/apache/spark/storage/BlockManager.scala