spark git commit: [SPARK-20848][SQL][FOLLOW-UP] Shutdown the pool after reading parquet files

2017-05-24 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 c3302e81e -> 7015f6f0e


[SPARK-20848][SQL][FOLLOW-UP] Shutdown the pool after reading parquet files

## What changes were proposed in this pull request?

This is a follow-up to #18073. Taking a safer approach to shutdown the pool to 
prevent possible issue. Also using `ThreadUtils.newForkJoinPool` instead to set 
a better thread name.

## How was this patch tested?

Manually test.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Liang-Chi Hsieh 

Closes #18100 from viirya/SPARK-20848-followup.

(cherry picked from commit 6b68d61cf31748a088778dfdd66491b2f89a3c7b)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7015f6f0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7015f6f0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7015f6f0

Branch: refs/heads/branch-2.1
Commit: 7015f6f0e7889616db9595b4e68b5a5b5ffe921a
Parents: c3302e8
Author: Liang-Chi Hsieh 
Authored: Thu May 25 09:55:45 2017 +0800
Committer: Wenchen Fan 
Committed: Thu May 25 09:56:16 2017 +0800

--
 .../datasources/parquet/ParquetFileFormat.scala | 42 ++--
 1 file changed, 22 insertions(+), 20 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7015f6f0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 70eb01c..f303a01 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -50,7 +50,7 @@ import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
-import org.apache.spark.util.SerializableConfiguration
+import org.apache.spark.util.{SerializableConfiguration, ThreadUtils}
 
 class ParquetFileFormat
   extends FileFormat
@@ -496,27 +496,29 @@ object ParquetFileFormat extends Logging {
   partFiles: Seq[FileStatus],
   ignoreCorruptFiles: Boolean): Seq[Footer] = {
 val parFiles = partFiles.par
-val pool = new ForkJoinPool(8)
+val pool = ThreadUtils.newForkJoinPool("readingParquetFooters", 8)
 parFiles.tasksupport = new ForkJoinTaskSupport(pool)
-parFiles.flatMap { currentFile =>
-  try {
-// Skips row group information since we only need the schema.
-// ParquetFileReader.readFooter throws RuntimeException, instead of 
IOException,
-// when it can't read the footer.
-Some(new Footer(currentFile.getPath(),
-  ParquetFileReader.readFooter(
-conf, currentFile, SKIP_ROW_GROUPS)))
-  } catch { case e: RuntimeException =>
-if (ignoreCorruptFiles) {
-  logWarning(s"Skipped the footer in the corrupted file: 
$currentFile", e)
-  None
-} else {
-  throw new IOException(s"Could not read footer for file: 
$currentFile", e)
+try {
+  parFiles.flatMap { currentFile =>
+try {
+  // Skips row group information since we only need the schema.
+  // ParquetFileReader.readFooter throws RuntimeException, instead of 
IOException,
+  // when it can't read the footer.
+  Some(new Footer(currentFile.getPath(),
+ParquetFileReader.readFooter(
+  conf, currentFile, SKIP_ROW_GROUPS)))
+} catch { case e: RuntimeException =>
+  if (ignoreCorruptFiles) {
+logWarning(s"Skipped the footer in the corrupted file: 
$currentFile", e)
+None
+  } else {
+throw new IOException(s"Could not read footer for file: 
$currentFile", e)
+  }
 }
-  } finally {
-pool.shutdown()
-  }
-}.seq
+  }.seq
+} finally {
+  pool.shutdown()
+}
   }
 
   /**


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20848][SQL][FOLLOW-UP] Shutdown the pool after reading parquet files

2017-05-24 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 3f82d65bf -> e0aa23939


[SPARK-20848][SQL][FOLLOW-UP] Shutdown the pool after reading parquet files

## What changes were proposed in this pull request?

This is a follow-up to #18073. Taking a safer approach to shutdown the pool to 
prevent possible issue. Also using `ThreadUtils.newForkJoinPool` instead to set 
a better thread name.

## How was this patch tested?

Manually test.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Liang-Chi Hsieh 

Closes #18100 from viirya/SPARK-20848-followup.

(cherry picked from commit 6b68d61cf31748a088778dfdd66491b2f89a3c7b)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e0aa2393
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e0aa2393
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e0aa2393

Branch: refs/heads/branch-2.2
Commit: e0aa23939a4cbf95f2cc83a7f5adee841b491358
Parents: 3f82d65
Author: Liang-Chi Hsieh 
Authored: Thu May 25 09:55:45 2017 +0800
Committer: Wenchen Fan 
Committed: Thu May 25 09:55:59 2017 +0800

--
 .../datasources/parquet/ParquetFileFormat.scala | 42 ++--
 1 file changed, 22 insertions(+), 20 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e0aa2393/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 29ed890..87fbf8b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -50,7 +50,7 @@ import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
-import org.apache.spark.util.SerializableConfiguration
+import org.apache.spark.util.{SerializableConfiguration, ThreadUtils}
 
 class ParquetFileFormat
   extends FileFormat
@@ -479,27 +479,29 @@ object ParquetFileFormat extends Logging {
   partFiles: Seq[FileStatus],
   ignoreCorruptFiles: Boolean): Seq[Footer] = {
 val parFiles = partFiles.par
-val pool = new ForkJoinPool(8)
+val pool = ThreadUtils.newForkJoinPool("readingParquetFooters", 8)
 parFiles.tasksupport = new ForkJoinTaskSupport(pool)
-parFiles.flatMap { currentFile =>
-  try {
-// Skips row group information since we only need the schema.
-// ParquetFileReader.readFooter throws RuntimeException, instead of 
IOException,
-// when it can't read the footer.
-Some(new Footer(currentFile.getPath(),
-  ParquetFileReader.readFooter(
-conf, currentFile, SKIP_ROW_GROUPS)))
-  } catch { case e: RuntimeException =>
-if (ignoreCorruptFiles) {
-  logWarning(s"Skipped the footer in the corrupted file: 
$currentFile", e)
-  None
-} else {
-  throw new IOException(s"Could not read footer for file: 
$currentFile", e)
+try {
+  parFiles.flatMap { currentFile =>
+try {
+  // Skips row group information since we only need the schema.
+  // ParquetFileReader.readFooter throws RuntimeException, instead of 
IOException,
+  // when it can't read the footer.
+  Some(new Footer(currentFile.getPath(),
+ParquetFileReader.readFooter(
+  conf, currentFile, SKIP_ROW_GROUPS)))
+} catch { case e: RuntimeException =>
+  if (ignoreCorruptFiles) {
+logWarning(s"Skipped the footer in the corrupted file: 
$currentFile", e)
+None
+  } else {
+throw new IOException(s"Could not read footer for file: 
$currentFile", e)
+  }
 }
-  } finally {
-pool.shutdown()
-  }
-}.seq
+  }.seq
+} finally {
+  pool.shutdown()
+}
   }
 
   /**


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20848][SQL][FOLLOW-UP] Shutdown the pool after reading parquet files

2017-05-24 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 197f9018a -> 6b68d61cf


[SPARK-20848][SQL][FOLLOW-UP] Shutdown the pool after reading parquet files

## What changes were proposed in this pull request?

This is a follow-up to #18073. Taking a safer approach to shutdown the pool to 
prevent possible issue. Also using `ThreadUtils.newForkJoinPool` instead to set 
a better thread name.

## How was this patch tested?

Manually test.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Liang-Chi Hsieh 

Closes #18100 from viirya/SPARK-20848-followup.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6b68d61c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6b68d61c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6b68d61c

Branch: refs/heads/master
Commit: 6b68d61cf31748a088778dfdd66491b2f89a3c7b
Parents: 197f901
Author: Liang-Chi Hsieh 
Authored: Thu May 25 09:55:45 2017 +0800
Committer: Wenchen Fan 
Committed: Thu May 25 09:55:45 2017 +0800

--
 .../datasources/parquet/ParquetFileFormat.scala | 42 ++--
 1 file changed, 22 insertions(+), 20 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6b68d61c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 29ed890..87fbf8b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -50,7 +50,7 @@ import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
-import org.apache.spark.util.SerializableConfiguration
+import org.apache.spark.util.{SerializableConfiguration, ThreadUtils}
 
 class ParquetFileFormat
   extends FileFormat
@@ -479,27 +479,29 @@ object ParquetFileFormat extends Logging {
   partFiles: Seq[FileStatus],
   ignoreCorruptFiles: Boolean): Seq[Footer] = {
 val parFiles = partFiles.par
-val pool = new ForkJoinPool(8)
+val pool = ThreadUtils.newForkJoinPool("readingParquetFooters", 8)
 parFiles.tasksupport = new ForkJoinTaskSupport(pool)
-parFiles.flatMap { currentFile =>
-  try {
-// Skips row group information since we only need the schema.
-// ParquetFileReader.readFooter throws RuntimeException, instead of 
IOException,
-// when it can't read the footer.
-Some(new Footer(currentFile.getPath(),
-  ParquetFileReader.readFooter(
-conf, currentFile, SKIP_ROW_GROUPS)))
-  } catch { case e: RuntimeException =>
-if (ignoreCorruptFiles) {
-  logWarning(s"Skipped the footer in the corrupted file: 
$currentFile", e)
-  None
-} else {
-  throw new IOException(s"Could not read footer for file: 
$currentFile", e)
+try {
+  parFiles.flatMap { currentFile =>
+try {
+  // Skips row group information since we only need the schema.
+  // ParquetFileReader.readFooter throws RuntimeException, instead of 
IOException,
+  // when it can't read the footer.
+  Some(new Footer(currentFile.getPath(),
+ParquetFileReader.readFooter(
+  conf, currentFile, SKIP_ROW_GROUPS)))
+} catch { case e: RuntimeException =>
+  if (ignoreCorruptFiles) {
+logWarning(s"Skipped the footer in the corrupted file: 
$currentFile", e)
+None
+  } else {
+throw new IOException(s"Could not read footer for file: 
$currentFile", e)
+  }
 }
-  } finally {
-pool.shutdown()
-  }
-}.seq
+  }.seq
+} finally {
+  pool.shutdown()
+}
   }
 
   /**


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20403][SQL] Modify the instructions of some functions

2017-05-24 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 ae65d3014 -> 3f82d65bf


[SPARK-20403][SQL] Modify the instructions of some functions

## What changes were proposed in this pull request?
1.add  instructions of  'cast'  function When using 'show functions'  and 
'desc function cast'
   command in spark-sql
2.Modify the  instructions of functions,such as
 
boolean,tinyint,smallint,int,bigint,float,double,decimal,date,timestamp,binary,string

## How was this patch tested?
Before modification:
spark-sql>desc function boolean;
Function: boolean
Class: org.apache.spark.sql.catalyst.expressions.Cast
Usage: boolean(expr AS type) - Casts the value `expr` to the target data type 
`type`.

After modification:
spark-sql> desc function boolean;
Function: boolean
Class: org.apache.spark.sql.catalyst.expressions.Cast
Usage: boolean(expr) - Casts the value `expr` to the target data type `boolean`.

spark-sql> desc function cast
Function: cast
Class: org.apache.spark.sql.catalyst.expressions.Cast
Usage: cast(expr AS type) - Casts the value `expr` to the target data type 
`type`.

Author: liuxian 

Closes #17698 from 10110346/wip_lx_0418.

(cherry picked from commit 197f9018a4641c8fc0725905ebfb535b61bed791)
Signed-off-by: Xiao Li 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3f82d65b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3f82d65b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3f82d65b

Branch: refs/heads/branch-2.2
Commit: 3f82d65bf6a628b0d46bb2eded9ed12f1d5aa9d2
Parents: ae65d30
Author: liuxian 
Authored: Wed May 24 17:32:02 2017 -0700
Committer: Xiao Li 
Committed: Wed May 24 17:32:18 2017 -0700

--
 .../catalyst/analysis/FunctionRegistry.scala|  6 -
 .../catalyst/expressions/mathExpressions.scala  |  2 +-
 .../test/resources/sql-tests/inputs/cast.sql|  2 ++
 .../resources/sql-tests/results/cast.sql.out| 23 +++-
 4 files changed, 30 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3f82d65b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 6fc154f..96b6b11 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -430,6 +430,8 @@ object FunctionRegistry {
 expression[StructsToJson]("to_json"),
 expression[JsonToStructs]("from_json"),
 
+// cast
+expression[Cast]("cast"),
 // Cast aliases (SPARK-16730)
 castAlias("boolean", BooleanType),
 castAlias("tinyint", ByteType),
@@ -512,7 +514,9 @@ object FunctionRegistry {
   }
   Cast(args.head, dataType)
 }
-(name, (expressionInfo[Cast](name), builder))
+val clazz = scala.reflect.classTag[Cast].runtimeClass
+val usage = "_FUNC_(expr) - Casts the value `expr` to the target data type 
`_FUNC_`."
+(name, (new ExpressionInfo(clazz.getCanonicalName, null, name, usage, 
null), builder))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/3f82d65b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala
index de1a46d..e040ad0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala
@@ -966,7 +966,7 @@ case class Logarithm(left: Expression, right: Expression)
  *
  * @param child expr to be round, all [[NumericType]] is allowed as Input
  * @param scale new scale to be round to, this should be a constant int at 
runtime
- * @param mode rounding mode (e.g. HALF_UP, HALF_UP)
+ * @param mode rounding mode (e.g. HALF_UP, HALF_EVEN)
  * @param modeStr rounding mode string name (e.g. "ROUND_HALF_UP", 
"ROUND_HALF_EVEN")
  */
 abstract class RoundBase(child: Expression, scale: Expression,

http://git-wip-us.apache.org/repos/asf/spark/blob/3f82d65b/sql/core/src/test/resources/sql-tests/inputs/cast.sql
--
diff --git 

spark git commit: [SPARK-20403][SQL] Modify the instructions of some functions

2017-05-24 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 5f8ff2fc9 -> 197f9018a


[SPARK-20403][SQL] Modify the instructions of some functions

## What changes were proposed in this pull request?
1.add  instructions of  'cast'  function When using 'show functions'  and 
'desc function cast'
   command in spark-sql
2.Modify the  instructions of functions,such as
 
boolean,tinyint,smallint,int,bigint,float,double,decimal,date,timestamp,binary,string

## How was this patch tested?
Before modification:
spark-sql>desc function boolean;
Function: boolean
Class: org.apache.spark.sql.catalyst.expressions.Cast
Usage: boolean(expr AS type) - Casts the value `expr` to the target data type 
`type`.

After modification:
spark-sql> desc function boolean;
Function: boolean
Class: org.apache.spark.sql.catalyst.expressions.Cast
Usage: boolean(expr) - Casts the value `expr` to the target data type `boolean`.

spark-sql> desc function cast
Function: cast
Class: org.apache.spark.sql.catalyst.expressions.Cast
Usage: cast(expr AS type) - Casts the value `expr` to the target data type 
`type`.

Author: liuxian 

Closes #17698 from 10110346/wip_lx_0418.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/197f9018
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/197f9018
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/197f9018

Branch: refs/heads/master
Commit: 197f9018a4641c8fc0725905ebfb535b61bed791
Parents: 5f8ff2f
Author: liuxian 
Authored: Wed May 24 17:32:02 2017 -0700
Committer: Xiao Li 
Committed: Wed May 24 17:32:02 2017 -0700

--
 .../catalyst/analysis/FunctionRegistry.scala|  6 -
 .../catalyst/expressions/mathExpressions.scala  |  2 +-
 .../test/resources/sql-tests/inputs/cast.sql|  2 ++
 .../resources/sql-tests/results/cast.sql.out| 23 +++-
 4 files changed, 30 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/197f9018/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index d2042ad..7521a7e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -431,6 +431,8 @@ object FunctionRegistry {
 expression[StructsToJson]("to_json"),
 expression[JsonToStructs]("from_json"),
 
+// cast
+expression[Cast]("cast"),
 // Cast aliases (SPARK-16730)
 castAlias("boolean", BooleanType),
 castAlias("tinyint", ByteType),
@@ -513,7 +515,9 @@ object FunctionRegistry {
   }
   Cast(args.head, dataType)
 }
-(name, (expressionInfo[Cast](name), builder))
+val clazz = scala.reflect.classTag[Cast].runtimeClass
+val usage = "_FUNC_(expr) - Casts the value `expr` to the target data type 
`_FUNC_`."
+(name, (new ExpressionInfo(clazz.getCanonicalName, null, name, usage, 
null), builder))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/197f9018/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala
index bf46a39..754b5c4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala
@@ -982,7 +982,7 @@ case class Logarithm(left: Expression, right: Expression)
  *
  * @param child expr to be round, all [[NumericType]] is allowed as Input
  * @param scale new scale to be round to, this should be a constant int at 
runtime
- * @param mode rounding mode (e.g. HALF_UP, HALF_UP)
+ * @param mode rounding mode (e.g. HALF_UP, HALF_EVEN)
  * @param modeStr rounding mode string name (e.g. "ROUND_HALF_UP", 
"ROUND_HALF_EVEN")
  */
 abstract class RoundBase(child: Expression, scale: Expression,

http://git-wip-us.apache.org/repos/asf/spark/blob/197f9018/sql/core/src/test/resources/sql-tests/inputs/cast.sql
--
diff --git a/sql/core/src/test/resources/sql-tests/inputs/cast.sql 
b/sql/core/src/test/resources/sql-tests/inputs/cast.sql
index 

spark git commit: [SPARK-18406][CORE][BACKPORT-2.1] Race between end-of-task and completion iterator read lock release

2017-05-24 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 2f68631f5 -> c3302e81e


[SPARK-18406][CORE][BACKPORT-2.1] Race between end-of-task and completion 
iterator read lock release

This is a backport PR of  #18076 to 2.1.

## What changes were proposed in this pull request?

When a TaskContext is not propagated properly to all child threads for the 
task, just like the reported cases in this issue, we fail to get to TID from 
TaskContext and that causes unable to release the lock and assertion failures. 
To resolve this, we have to explicitly pass the TID value to the `unlock` 
method.

## How was this patch tested?

Add new failing regression test case in `RDDSuite`.

Author: Xingbo Jiang 

Closes #18099 from jiangxb1987/completion-iterator-2.1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c3302e81
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c3302e81
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c3302e81

Branch: refs/heads/branch-2.1
Commit: c3302e81e3e74744ec8da66b18f81c233a15e918
Parents: 2f68631
Author: Xingbo Jiang 
Authored: Thu May 25 08:31:04 2017 +0800
Committer: Wenchen Fan 
Committed: Thu May 25 08:31:04 2017 +0800

--
 .../apache/spark/network/BlockDataManager.scala |  2 +-
 .../apache/spark/storage/BlockInfoManager.scala | 15 +-
 .../org/apache/spark/storage/BlockManager.scala | 21 +++-
 .../scala/org/apache/spark/rdd/RDDSuite.scala   | 18 -
 4 files changed, 44 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c3302e81/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala 
b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
index 8f83668..b3f8bfe 100644
--- a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
@@ -46,5 +46,5 @@ trait BlockDataManager {
   /**
* Release locks acquired by [[putBlockData()]] and [[getBlockData()]].
*/
-  def releaseLock(blockId: BlockId): Unit
+  def releaseLock(blockId: BlockId, taskAttemptId: Option[Long]): Unit
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c3302e81/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
index dd8f5ba..c0e18e5 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
@@ -281,22 +281,27 @@ private[storage] class BlockInfoManager extends Logging {
 
   /**
* Release a lock on the given block.
+   * In case a TaskContext is not propagated properly to all child threads for 
the task, we fail to
+   * get the TID from TaskContext, so we have to explicitly pass the TID value 
to release the lock.
+   *
+   * See SPARK-18406 for more discussion of this issue.
*/
-  def unlock(blockId: BlockId): Unit = synchronized {
-logTrace(s"Task $currentTaskAttemptId releasing lock for $blockId")
+  def unlock(blockId: BlockId, taskAttemptId: Option[TaskAttemptId] = None): 
Unit = synchronized {
+val taskId = taskAttemptId.getOrElse(currentTaskAttemptId)
+logTrace(s"Task $taskId releasing lock for $blockId")
 val info = get(blockId).getOrElse {
   throw new IllegalStateException(s"Block $blockId not found")
 }
 if (info.writerTask != BlockInfo.NO_WRITER) {
   info.writerTask = BlockInfo.NO_WRITER
-  writeLocksByTask.removeBinding(currentTaskAttemptId, blockId)
+  writeLocksByTask.removeBinding(taskId, blockId)
 } else {
   assert(info.readerCount > 0, s"Block $blockId is not locked for reading")
   info.readerCount -= 1
-  val countsForTask = readLocksByTask(currentTaskAttemptId)
+  val countsForTask = readLocksByTask(taskId)
   val newPinCountForTask: Int = countsForTask.remove(blockId, 1) - 1
   assert(newPinCountForTask >= 0,
-s"Task $currentTaskAttemptId release lock on block $blockId more times 
than it acquired it")
+s"Task $taskId release lock on block $blockId more times than it 
acquired it")
 }
 notifyAll()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/c3302e81/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
--
diff --git 

spark git commit: [SPARK-16202][SQL][DOC] Follow-up to Correct The Description of CreatableRelationProvider's createRelation

2017-05-24 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 2405afce4 -> ae65d3014


[SPARK-16202][SQL][DOC] Follow-up to Correct The Description of 
CreatableRelationProvider's createRelation

## What changes were proposed in this pull request?

Follow-up to SPARK-16202:

1. Remove the duplication of the meaning of `SaveMode` (as one was in fact 
missing that had proven that the duplication may be incomplete in the future 
again)

2. Use standard scaladoc tags

/cc gatorsmile rxin yhuai (as they were involved previously)

## How was this patch tested?

local build

Author: Jacek Laskowski 

Closes #18026 from jaceklaskowski/CreatableRelationProvider-SPARK-16202.

(cherry picked from commit 5f8ff2fc9a859ceeaa8f1d03060fdbb30951e706)
Signed-off-by: Xiao Li 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ae65d301
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ae65d301
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ae65d301

Branch: refs/heads/branch-2.2
Commit: ae65d3014941344a924da583959e6b4b1d1d64f2
Parents: 2405afc
Author: Jacek Laskowski 
Authored: Wed May 24 17:24:23 2017 -0700
Committer: Xiao Li 
Committed: Wed May 24 17:24:33 2017 -0700

--
 .../org/apache/spark/sql/sources/interfaces.scala  | 17 +++--
 1 file changed, 7 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ae65d301/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index ff8b15b..86eeb2f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -163,16 +163,13 @@ trait StreamSinkProvider {
 @InterfaceStability.Stable
 trait CreatableRelationProvider {
   /**
-   * Save the DataFrame to the destination and return a relation with the 
given parameters based on
-   * the contents of the given DataFrame. The mode specifies the expected 
behavior of createRelation
-   * when data already exists.
-   * Right now, there are three modes, Append, Overwrite, and ErrorIfExists.
-   * Append mode means that when saving a DataFrame to a data source, if data 
already exists,
-   * contents of the DataFrame are expected to be appended to existing data.
-   * Overwrite mode means that when saving a DataFrame to a data source, if 
data already exists,
-   * existing data is expected to be overwritten by the contents of the 
DataFrame.
-   * ErrorIfExists mode means that when saving a DataFrame to a data source,
-   * if data already exists, an exception is expected to be thrown.
+   * Saves a DataFrame to a destination (using data source-specific parameters)
+   *
+   * @param sqlContext SQLContext
+   * @param mode specifies what happens when the destination already exists
+   * @param parameters data source-specific parameters
+   * @param data DataFrame to save (i.e. the rows after executing the query)
+   * @return Relation with a known schema
*
* @since 1.3.0
*/


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-16202][SQL][DOC] Follow-up to Correct The Description of CreatableRelationProvider's createRelation

2017-05-24 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master c0b3e45e3 -> 5f8ff2fc9


[SPARK-16202][SQL][DOC] Follow-up to Correct The Description of 
CreatableRelationProvider's createRelation

## What changes were proposed in this pull request?

Follow-up to SPARK-16202:

1. Remove the duplication of the meaning of `SaveMode` (as one was in fact 
missing that had proven that the duplication may be incomplete in the future 
again)

2. Use standard scaladoc tags

/cc gatorsmile rxin yhuai (as they were involved previously)

## How was this patch tested?

local build

Author: Jacek Laskowski 

Closes #18026 from jaceklaskowski/CreatableRelationProvider-SPARK-16202.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5f8ff2fc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5f8ff2fc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5f8ff2fc

Branch: refs/heads/master
Commit: 5f8ff2fc9a859ceeaa8f1d03060fdbb30951e706
Parents: c0b3e45
Author: Jacek Laskowski 
Authored: Wed May 24 17:24:23 2017 -0700
Committer: Xiao Li 
Committed: Wed May 24 17:24:23 2017 -0700

--
 .../org/apache/spark/sql/sources/interfaces.scala  | 17 +++--
 1 file changed, 7 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5f8ff2fc/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index ff8b15b..86eeb2f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -163,16 +163,13 @@ trait StreamSinkProvider {
 @InterfaceStability.Stable
 trait CreatableRelationProvider {
   /**
-   * Save the DataFrame to the destination and return a relation with the 
given parameters based on
-   * the contents of the given DataFrame. The mode specifies the expected 
behavior of createRelation
-   * when data already exists.
-   * Right now, there are three modes, Append, Overwrite, and ErrorIfExists.
-   * Append mode means that when saving a DataFrame to a data source, if data 
already exists,
-   * contents of the DataFrame are expected to be appended to existing data.
-   * Overwrite mode means that when saving a DataFrame to a data source, if 
data already exists,
-   * existing data is expected to be overwritten by the contents of the 
DataFrame.
-   * ErrorIfExists mode means that when saving a DataFrame to a data source,
-   * if data already exists, an exception is expected to be thrown.
+   * Saves a DataFrame to a destination (using data source-specific parameters)
+   *
+   * @param sqlContext SQLContext
+   * @param mode specifies what happens when the destination already exists
+   * @param parameters data source-specific parameters
+   * @param data DataFrame to save (i.e. the rows after executing the query)
+   * @return Relation with a known schema
*
* @since 1.3.0
*/


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20872][SQL] ShuffleExchange.nodeName should handle null coordinator

2017-05-24 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 b7a2a16b1 -> 2405afce4


[SPARK-20872][SQL] ShuffleExchange.nodeName should handle null coordinator

## What changes were proposed in this pull request?

A one-liner change in `ShuffleExchange.nodeName` to cover the case when 
`coordinator` is `null`, so that the match expression is exhaustive.

Please refer to 
[SPARK-20872](https://issues.apache.org/jira/browse/SPARK-20872) for a 
description of the symptoms.
TL;DR is that inspecting a `ShuffleExchange` (directly or transitively) on the 
Executor side can hit a case where the `coordinator` field of a 
`ShuffleExchange` is null, and thus will trigger a `MatchError` in 
`ShuffleExchange.nodeName()`'s inexhaustive match expression.

Also changed two other match conditions in `ShuffleExchange` on the 
`coordinator` field to be consistent.

## How was this patch tested?

Manually tested this change with a case where the `coordinator` is null to make 
sure `ShuffleExchange.nodeName` doesn't throw a `MatchError` any more.

Author: Kris Mok 

Closes #18095 from rednaxelafx/shuffleexchange-nodename.

(cherry picked from commit c0b3e45e3b46a5235b748cb85ad200c9ec1bb426)
Signed-off-by: Xiao Li 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2405afce
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2405afce
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2405afce

Branch: refs/heads/branch-2.2
Commit: 2405afce4e87c0486f2aef1d068f17aea2480b17
Parents: b7a2a16
Author: Kris Mok 
Authored: Wed May 24 17:19:35 2017 -0700
Committer: Xiao Li 
Committed: Wed May 24 17:19:46 2017 -0700

--
 .../spark/sql/execution/exchange/ShuffleExchange.scala  | 9 ++---
 1 file changed, 6 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2405afce/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala
index f06544e..eebe6ad 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala
@@ -40,6 +40,9 @@ case class ShuffleExchange(
 child: SparkPlan,
 @transient coordinator: Option[ExchangeCoordinator]) extends Exchange {
 
+  // NOTE: coordinator can be null after serialization/deserialization,
+  //   e.g. it can be null on the Executor side
+
   override lazy val metrics = Map(
 "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"))
 
@@ -47,7 +50,7 @@ case class ShuffleExchange(
 val extraInfo = coordinator match {
   case Some(exchangeCoordinator) =>
 s"(coordinator id: ${System.identityHashCode(exchangeCoordinator)})"
-  case None => ""
+  case _ => ""
 }
 
 val simpleNodeName = "Exchange"
@@ -70,7 +73,7 @@ case class ShuffleExchange(
 // the plan.
 coordinator match {
   case Some(exchangeCoordinator) => 
exchangeCoordinator.registerExchange(this)
-  case None =>
+  case _ =>
 }
   }
 
@@ -117,7 +120,7 @@ case class ShuffleExchange(
   val shuffleRDD = exchangeCoordinator.postShuffleRDD(this)
   assert(shuffleRDD.partitions.length == newPartitioning.numPartitions)
   shuffleRDD
-case None =>
+case _ =>
   val shuffleDependency = prepareShuffleDependency()
   preparePostShuffleRDD(shuffleDependency)
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20872][SQL] ShuffleExchange.nodeName should handle null coordinator

2017-05-24 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 95aef660b -> c0b3e45e3


[SPARK-20872][SQL] ShuffleExchange.nodeName should handle null coordinator

## What changes were proposed in this pull request?

A one-liner change in `ShuffleExchange.nodeName` to cover the case when 
`coordinator` is `null`, so that the match expression is exhaustive.

Please refer to 
[SPARK-20872](https://issues.apache.org/jira/browse/SPARK-20872) for a 
description of the symptoms.
TL;DR is that inspecting a `ShuffleExchange` (directly or transitively) on the 
Executor side can hit a case where the `coordinator` field of a 
`ShuffleExchange` is null, and thus will trigger a `MatchError` in 
`ShuffleExchange.nodeName()`'s inexhaustive match expression.

Also changed two other match conditions in `ShuffleExchange` on the 
`coordinator` field to be consistent.

## How was this patch tested?

Manually tested this change with a case where the `coordinator` is null to make 
sure `ShuffleExchange.nodeName` doesn't throw a `MatchError` any more.

Author: Kris Mok 

Closes #18095 from rednaxelafx/shuffleexchange-nodename.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c0b3e45e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c0b3e45e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c0b3e45e

Branch: refs/heads/master
Commit: c0b3e45e3b46a5235b748cb85ad200c9ec1bb426
Parents: 95aef66
Author: Kris Mok 
Authored: Wed May 24 17:19:35 2017 -0700
Committer: Xiao Li 
Committed: Wed May 24 17:19:35 2017 -0700

--
 .../spark/sql/execution/exchange/ShuffleExchange.scala  | 9 ++---
 1 file changed, 6 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c0b3e45e/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala
index f06544e..eebe6ad 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala
@@ -40,6 +40,9 @@ case class ShuffleExchange(
 child: SparkPlan,
 @transient coordinator: Option[ExchangeCoordinator]) extends Exchange {
 
+  // NOTE: coordinator can be null after serialization/deserialization,
+  //   e.g. it can be null on the Executor side
+
   override lazy val metrics = Map(
 "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"))
 
@@ -47,7 +50,7 @@ case class ShuffleExchange(
 val extraInfo = coordinator match {
   case Some(exchangeCoordinator) =>
 s"(coordinator id: ${System.identityHashCode(exchangeCoordinator)})"
-  case None => ""
+  case _ => ""
 }
 
 val simpleNodeName = "Exchange"
@@ -70,7 +73,7 @@ case class ShuffleExchange(
 // the plan.
 coordinator match {
   case Some(exchangeCoordinator) => 
exchangeCoordinator.registerExchange(this)
-  case None =>
+  case _ =>
 }
   }
 
@@ -117,7 +120,7 @@ case class ShuffleExchange(
   val shuffleRDD = exchangeCoordinator.postShuffleRDD(this)
   assert(shuffleRDD.partitions.length == newPartitioning.numPartitions)
   shuffleRDD
-case None =>
+case _ =>
   val shuffleDependency = prepareShuffleDependency()
   preparePostShuffleRDD(shuffleDependency)
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20205][CORE] Make sure StageInfo is updated before sending event.

2017-05-24 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master a64746677 -> 95aef660b


[SPARK-20205][CORE] Make sure StageInfo is updated before sending event.

The DAGScheduler was sending a "stage submitted" event before it properly
updated the event's information. This meant that a listener (e.g. the
even logging listener) could record wrong information about the event.

This change sets the stage's submission time before the event is submitted,
when there are tasks to be executed in the stage.

Tested with existing unit tests.

Author: Marcelo Vanzin 

Closes #17925 from vanzin/SPARK-20205.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/95aef660
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/95aef660
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/95aef660

Branch: refs/heads/master
Commit: 95aef660b73ec931e746d1ec8ae7848762ba0d7c
Parents: a647466
Author: Marcelo Vanzin 
Authored: Wed May 24 16:57:17 2017 -0700
Committer: Marcelo Vanzin 
Committed: Wed May 24 16:57:17 2017 -0700

--
 .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 8 +++-
 1 file changed, 7 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/95aef660/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 875acc3..ab2255f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -983,6 +983,13 @@ class DAGScheduler(
 }
 
 stage.makeNewStageAttempt(partitionsToCompute.size, 
taskIdToLocations.values.toSeq)
+
+// If there are tasks to execute, record the submission time of the stage. 
Otherwise,
+// post the even without the submission time, which indicates that this 
stage was
+// skipped.
+if (partitionsToCompute.nonEmpty) {
+  stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
+}
 listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
 
 // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it 
multiple times.
@@ -1054,7 +1061,6 @@ class DAGScheduler(
 s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
   taskScheduler.submitTasks(new TaskSet(
 tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, 
properties))
-  stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
 } else {
   // Because we posted SparkListenerStageSubmitted earlier, we should mark
   // the stage as completed here in case there are no tasks to run


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18406][CORE][BACKPORT-2.0] Race between end-of-task and completion iterator read lock release

2017-05-24 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 72e1f83d7 -> 79fbfbbc7


[SPARK-18406][CORE][BACKPORT-2.0] Race between end-of-task and completion 
iterator read lock release

This is a backport PR of  #18076 to 2.0 and 2.1.

## What changes were proposed in this pull request?

When a TaskContext is not propagated properly to all child threads for the 
task, just like the reported cases in this issue, we fail to get to TID from 
TaskContext and that causes unable to release the lock and assertion failures. 
To resolve this, we have to explicitly pass the TID value to the `unlock` 
method.

## How was this patch tested?

Add new failing regression test case in `RDDSuite`.

Author: Xingbo Jiang 

Closes #18096 from jiangxb1987/completion-iterator-2.0.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/79fbfbbc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/79fbfbbc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/79fbfbbc

Branch: refs/heads/branch-2.0
Commit: 79fbfbbc7ad7fea1ca4124981201e947db67745d
Parents: 72e1f83
Author: Xingbo Jiang 
Authored: Wed May 24 14:34:17 2017 -0700
Committer: Xiao Li 
Committed: Wed May 24 14:34:17 2017 -0700

--
 .../apache/spark/network/BlockDataManager.scala |  2 +-
 .../apache/spark/storage/BlockInfoManager.scala | 15 +-
 .../org/apache/spark/storage/BlockManager.scala | 21 +++-
 .../scala/org/apache/spark/rdd/RDDSuite.scala   | 18 -
 4 files changed, 44 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/79fbfbbc/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala 
b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
index 8f83668..b3f8bfe 100644
--- a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
@@ -46,5 +46,5 @@ trait BlockDataManager {
   /**
* Release locks acquired by [[putBlockData()]] and [[getBlockData()]].
*/
-  def releaseLock(blockId: BlockId): Unit
+  def releaseLock(blockId: BlockId, taskAttemptId: Option[Long]): Unit
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/79fbfbbc/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
index dd8f5ba..c0e18e5 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
@@ -281,22 +281,27 @@ private[storage] class BlockInfoManager extends Logging {
 
   /**
* Release a lock on the given block.
+   * In case a TaskContext is not propagated properly to all child threads for 
the task, we fail to
+   * get the TID from TaskContext, so we have to explicitly pass the TID value 
to release the lock.
+   *
+   * See SPARK-18406 for more discussion of this issue.
*/
-  def unlock(blockId: BlockId): Unit = synchronized {
-logTrace(s"Task $currentTaskAttemptId releasing lock for $blockId")
+  def unlock(blockId: BlockId, taskAttemptId: Option[TaskAttemptId] = None): 
Unit = synchronized {
+val taskId = taskAttemptId.getOrElse(currentTaskAttemptId)
+logTrace(s"Task $taskId releasing lock for $blockId")
 val info = get(blockId).getOrElse {
   throw new IllegalStateException(s"Block $blockId not found")
 }
 if (info.writerTask != BlockInfo.NO_WRITER) {
   info.writerTask = BlockInfo.NO_WRITER
-  writeLocksByTask.removeBinding(currentTaskAttemptId, blockId)
+  writeLocksByTask.removeBinding(taskId, blockId)
 } else {
   assert(info.readerCount > 0, s"Block $blockId is not locked for reading")
   info.readerCount -= 1
-  val countsForTask = readLocksByTask(currentTaskAttemptId)
+  val countsForTask = readLocksByTask(taskId)
   val newPinCountForTask: Int = countsForTask.remove(blockId, 1) - 1
   assert(newPinCountForTask >= 0,
-s"Task $currentTaskAttemptId release lock on block $blockId more times 
than it acquired it")
+s"Task $taskId release lock on block $blockId more times than it 
acquired it")
 }
 notifyAll()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/79fbfbbc/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
--
diff --git 

spark git commit: [SPARK-20867][SQL] Move hints from Statistics into HintInfo class

2017-05-24 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 c59ad420b -> b7a2a16b1


[SPARK-20867][SQL] Move hints from Statistics into HintInfo class

## What changes were proposed in this pull request?
This is a follow-up to SPARK-20857 to move the broadcast hint from Statistics 
into a new HintInfo class, so we can be more flexible in adding new hints in 
the future.

## How was this patch tested?
Updated test cases to reflect the change.

Author: Reynold Xin 

Closes #18087 from rxin/SPARK-20867.

(cherry picked from commit a64746677bf09ef67e3fd538355a6ee9b5ce8cf4)
Signed-off-by: Xiao Li 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b7a2a16b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b7a2a16b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b7a2a16b

Branch: refs/heads/branch-2.2
Commit: b7a2a16b1e01375292938fc48b0a333ec4e7cd30
Parents: c59ad42
Author: Reynold Xin 
Authored: Wed May 24 13:57:19 2017 -0700
Committer: Xiao Li 
Committed: Wed May 24 13:57:28 2017 -0700

--
 .../sql/catalyst/analysis/ResolveHints.scala|  6 ++---
 .../catalyst/plans/logical/LogicalPlan.scala|  2 +-
 .../sql/catalyst/plans/logical/Statistics.scala | 11 +++--
 .../plans/logical/basicLogicalOperators.scala   | 17 +++---
 .../sql/catalyst/plans/logical/hints.scala  | 24 
 .../statsEstimation/AggregateEstimation.scala   |  2 +-
 .../catalyst/analysis/ResolveHintsSuite.scala   | 20 
 .../BasicStatsEstimationSuite.scala | 17 +++---
 .../spark/sql/execution/SparkStrategies.scala   |  2 +-
 .../scala/org/apache/spark/sql/functions.scala  |  4 ++--
 .../spark/sql/StatisticsCollectionSuite.scala   |  2 +-
 11 files changed, 59 insertions(+), 48 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b7a2a16b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
index 9dfd84c..86c788a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
@@ -57,9 +57,9 @@ object ResolveHints {
   val newNode = CurrentOrigin.withOrigin(plan.origin) {
 plan match {
   case u: UnresolvedRelation if toBroadcast.exists(resolver(_, 
u.tableIdentifier.table)) =>
-ResolvedHint(plan, isBroadcastable = Option(true))
+ResolvedHint(plan, HintInfo(isBroadcastable = Option(true)))
   case r: SubqueryAlias if toBroadcast.exists(resolver(_, r.alias)) =>
-ResolvedHint(plan, isBroadcastable = Option(true))
+ResolvedHint(plan, HintInfo(isBroadcastable = Option(true)))
 
   case _: ResolvedHint | _: View | _: With | _: SubqueryAlias =>
 // Don't traverse down these nodes.
@@ -88,7 +88,7 @@ object ResolveHints {
   case h: UnresolvedHint if 
BROADCAST_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
 if (h.parameters.isEmpty) {
   // If there is no table alias specified, turn the entire subtree 
into a BroadcastHint.
-  ResolvedHint(h.child, isBroadcastable = Option(true))
+  ResolvedHint(h.child, HintInfo(isBroadcastable = Option(true)))
 } else {
   // Otherwise, find within the subtree query plans that should be 
broadcasted.
   applyBroadcastHint(h.child, h.parameters.toSet)

http://git-wip-us.apache.org/repos/asf/spark/blob/b7a2a16b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 6bdcf49..2ebb2ff 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -347,7 +347,7 @@ abstract class UnaryNode extends LogicalPlan {
 }
 
 // Don't propagate rowCount and attributeStats, since they are not 
estimated here.
-Statistics(sizeInBytes = sizeInBytes, isBroadcastable = 
child.stats(conf).isBroadcastable)
+Statistics(sizeInBytes = sizeInBytes, hints = child.stats(conf).hints)
   }
 }
 


spark git commit: [SPARK-20867][SQL] Move hints from Statistics into HintInfo class

2017-05-24 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master f72ad303f -> a64746677


[SPARK-20867][SQL] Move hints from Statistics into HintInfo class

## What changes were proposed in this pull request?
This is a follow-up to SPARK-20857 to move the broadcast hint from Statistics 
into a new HintInfo class, so we can be more flexible in adding new hints in 
the future.

## How was this patch tested?
Updated test cases to reflect the change.

Author: Reynold Xin 

Closes #18087 from rxin/SPARK-20867.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6474667
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6474667
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6474667

Branch: refs/heads/master
Commit: a64746677bf09ef67e3fd538355a6ee9b5ce8cf4
Parents: f72ad30
Author: Reynold Xin 
Authored: Wed May 24 13:57:19 2017 -0700
Committer: Xiao Li 
Committed: Wed May 24 13:57:19 2017 -0700

--
 .../sql/catalyst/analysis/ResolveHints.scala|  6 ++---
 .../catalyst/plans/logical/LogicalPlan.scala|  2 +-
 .../sql/catalyst/plans/logical/Statistics.scala | 11 +++--
 .../plans/logical/basicLogicalOperators.scala   | 17 +++---
 .../sql/catalyst/plans/logical/hints.scala  | 24 
 .../statsEstimation/AggregateEstimation.scala   |  2 +-
 .../catalyst/analysis/ResolveHintsSuite.scala   | 20 
 .../BasicStatsEstimationSuite.scala | 17 +++---
 .../spark/sql/execution/SparkStrategies.scala   |  2 +-
 .../scala/org/apache/spark/sql/functions.scala  |  4 ++--
 .../spark/sql/StatisticsCollectionSuite.scala   |  2 +-
 11 files changed, 59 insertions(+), 48 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a6474667/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
index 9dfd84c..86c788a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
@@ -57,9 +57,9 @@ object ResolveHints {
   val newNode = CurrentOrigin.withOrigin(plan.origin) {
 plan match {
   case u: UnresolvedRelation if toBroadcast.exists(resolver(_, 
u.tableIdentifier.table)) =>
-ResolvedHint(plan, isBroadcastable = Option(true))
+ResolvedHint(plan, HintInfo(isBroadcastable = Option(true)))
   case r: SubqueryAlias if toBroadcast.exists(resolver(_, r.alias)) =>
-ResolvedHint(plan, isBroadcastable = Option(true))
+ResolvedHint(plan, HintInfo(isBroadcastable = Option(true)))
 
   case _: ResolvedHint | _: View | _: With | _: SubqueryAlias =>
 // Don't traverse down these nodes.
@@ -88,7 +88,7 @@ object ResolveHints {
   case h: UnresolvedHint if 
BROADCAST_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
 if (h.parameters.isEmpty) {
   // If there is no table alias specified, turn the entire subtree 
into a BroadcastHint.
-  ResolvedHint(h.child, isBroadcastable = Option(true))
+  ResolvedHint(h.child, HintInfo(isBroadcastable = Option(true)))
 } else {
   // Otherwise, find within the subtree query plans that should be 
broadcasted.
   applyBroadcastHint(h.child, h.parameters.toSet)

http://git-wip-us.apache.org/repos/asf/spark/blob/a6474667/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 6bdcf49..2ebb2ff 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -347,7 +347,7 @@ abstract class UnaryNode extends LogicalPlan {
 }
 
 // Don't propagate rowCount and attributeStats, since they are not 
estimated here.
-Statistics(sizeInBytes = sizeInBytes, isBroadcastable = 
child.stats(conf).isBroadcastable)
+Statistics(sizeInBytes = sizeInBytes, hints = child.stats(conf).hints)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a6474667/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala

spark git commit: [SPARK-20848][SQL] Shutdown the pool after reading parquet files

2017-05-24 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 13adc0fc0 -> 2f68631f5


[SPARK-20848][SQL] Shutdown the pool after reading parquet files

## What changes were proposed in this pull request?

>From JIRA: On each call to spark.read.parquet, a new ForkJoinPool is created. 
>One of the threads in the pool is kept in the WAITING state, and never 
>stopped, which leads to unbounded growth in number of threads.

We should shutdown the pool after reading parquet files.

## How was this patch tested?

Added a test to ParquetFileFormatSuite.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Liang-Chi Hsieh 

Closes #18073 from viirya/SPARK-20848.

(cherry picked from commit f72ad303f05a6d99513ea3b121375726b177199c)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2f68631f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2f68631f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2f68631f

Branch: refs/heads/branch-2.1
Commit: 2f68631f523e4db08549497d9c3264a43137bbb1
Parents: 13adc0f
Author: Liang-Chi Hsieh 
Authored: Thu May 25 00:35:40 2017 +0800
Committer: Wenchen Fan 
Committed: Thu May 25 00:36:22 2017 +0800

--
 .../sql/execution/datasources/parquet/ParquetFileFormat.scala   | 5 -
 1 file changed, 4 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2f68631f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 2b4892e..70eb01c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -496,7 +496,8 @@ object ParquetFileFormat extends Logging {
   partFiles: Seq[FileStatus],
   ignoreCorruptFiles: Boolean): Seq[Footer] = {
 val parFiles = partFiles.par
-parFiles.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(8))
+val pool = new ForkJoinPool(8)
+parFiles.tasksupport = new ForkJoinTaskSupport(pool)
 parFiles.flatMap { currentFile =>
   try {
 // Skips row group information since we only need the schema.
@@ -512,6 +513,8 @@ object ParquetFileFormat extends Logging {
 } else {
   throw new IOException(s"Could not read footer for file: 
$currentFile", e)
 }
+  } finally {
+pool.shutdown()
   }
 }.seq
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20848][SQL] Shutdown the pool after reading parquet files

2017-05-24 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master bc66a77bb -> f72ad303f


[SPARK-20848][SQL] Shutdown the pool after reading parquet files

## What changes were proposed in this pull request?

>From JIRA: On each call to spark.read.parquet, a new ForkJoinPool is created. 
>One of the threads in the pool is kept in the WAITING state, and never 
>stopped, which leads to unbounded growth in number of threads.

We should shutdown the pool after reading parquet files.

## How was this patch tested?

Added a test to ParquetFileFormatSuite.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Liang-Chi Hsieh 

Closes #18073 from viirya/SPARK-20848.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f72ad303
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f72ad303
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f72ad303

Branch: refs/heads/master
Commit: f72ad303f05a6d99513ea3b121375726b177199c
Parents: bc66a77
Author: Liang-Chi Hsieh 
Authored: Thu May 25 00:35:40 2017 +0800
Committer: Wenchen Fan 
Committed: Thu May 25 00:35:40 2017 +0800

--
 .../sql/execution/datasources/parquet/ParquetFileFormat.scala   | 5 -
 1 file changed, 4 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f72ad303/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 2f3a2c6..29ed890 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -479,7 +479,8 @@ object ParquetFileFormat extends Logging {
   partFiles: Seq[FileStatus],
   ignoreCorruptFiles: Boolean): Seq[Footer] = {
 val parFiles = partFiles.par
-parFiles.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(8))
+val pool = new ForkJoinPool(8)
+parFiles.tasksupport = new ForkJoinTaskSupport(pool)
 parFiles.flatMap { currentFile =>
   try {
 // Skips row group information since we only need the schema.
@@ -495,6 +496,8 @@ object ParquetFileFormat extends Logging {
 } else {
   throw new IOException(s"Could not read footer for file: 
$currentFile", e)
 }
+  } finally {
+pool.shutdown()
   }
 }.seq
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20862][MLLIB][PYTHON] Avoid passing float to ndarray.reshape in LogisticRegressionModel

2017-05-24 Thread yliang
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 4dd34d004 -> 72e1f83d7


[SPARK-20862][MLLIB][PYTHON] Avoid passing float to ndarray.reshape in 
LogisticRegressionModel

## What changes were proposed in this pull request?

Fixed TypeError with python3 and numpy 1.12.1. Numpy's `reshape` no longer 
takes floats as arguments as of 1.12. Also, python3 uses float division for 
`/`, we should be using `//` to ensure that `_dataWithBiasSize` doesn't get set 
to a float.

## How was this patch tested?

Existing tests run using python3 and numpy 1.12.

Author: Bago Amirbekian 

Closes #18081 from MrBago/BF-py3floatbug.

(cherry picked from commit bc66a77bbe2120cc21bd8da25194efca4cde13c3)
Signed-off-by: Yanbo Liang 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/72e1f83d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/72e1f83d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/72e1f83d

Branch: refs/heads/branch-2.0
Commit: 72e1f83d78e51b53c104d1cd101c10bbe557c047
Parents: 4dd34d0
Author: Bago Amirbekian 
Authored: Wed May 24 22:55:38 2017 +0800
Committer: Yanbo Liang 
Committed: Wed May 24 23:00:01 2017 +0800

--
 python/pyspark/mllib/classification.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/72e1f83d/python/pyspark/mllib/classification.py
--
diff --git a/python/pyspark/mllib/classification.py 
b/python/pyspark/mllib/classification.py
index 9f53ed0..e04eeb2 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -171,7 +171,7 @@ class LogisticRegressionModel(LinearClassificationModel):
 self._dataWithBiasSize = None
 self._weightsMatrix = None
 else:
-self._dataWithBiasSize = self._coeff.size / (self._numClasses - 1)
+self._dataWithBiasSize = self._coeff.size // (self._numClasses - 1)
 self._weightsMatrix = 
self._coeff.toArray().reshape(self._numClasses - 1,
 
self._dataWithBiasSize)
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20862][MLLIB][PYTHON] Avoid passing float to ndarray.reshape in LogisticRegressionModel

2017-05-24 Thread yliang
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 f4538c95f -> 13adc0fc0


[SPARK-20862][MLLIB][PYTHON] Avoid passing float to ndarray.reshape in 
LogisticRegressionModel

## What changes were proposed in this pull request?

Fixed TypeError with python3 and numpy 1.12.1. Numpy's `reshape` no longer 
takes floats as arguments as of 1.12. Also, python3 uses float division for 
`/`, we should be using `//` to ensure that `_dataWithBiasSize` doesn't get set 
to a float.

## How was this patch tested?

Existing tests run using python3 and numpy 1.12.

Author: Bago Amirbekian 

Closes #18081 from MrBago/BF-py3floatbug.

(cherry picked from commit bc66a77bbe2120cc21bd8da25194efca4cde13c3)
Signed-off-by: Yanbo Liang 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/13adc0fc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/13adc0fc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/13adc0fc

Branch: refs/heads/branch-2.1
Commit: 13adc0fc0e940a4ea8b703241666440357a597e3
Parents: f4538c9
Author: Bago Amirbekian 
Authored: Wed May 24 22:55:38 2017 +0800
Committer: Yanbo Liang 
Committed: Wed May 24 22:58:16 2017 +0800

--
 python/pyspark/mllib/classification.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/13adc0fc/python/pyspark/mllib/classification.py
--
diff --git a/python/pyspark/mllib/classification.py 
b/python/pyspark/mllib/classification.py
index 9f53ed0..e04eeb2 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -171,7 +171,7 @@ class LogisticRegressionModel(LinearClassificationModel):
 self._dataWithBiasSize = None
 self._weightsMatrix = None
 else:
-self._dataWithBiasSize = self._coeff.size / (self._numClasses - 1)
+self._dataWithBiasSize = self._coeff.size // (self._numClasses - 1)
 self._weightsMatrix = 
self._coeff.toArray().reshape(self._numClasses - 1,
 
self._dataWithBiasSize)
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20862][MLLIB][PYTHON] Avoid passing float to ndarray.reshape in LogisticRegressionModel

2017-05-24 Thread yliang
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 1d107242f -> 83aeac9e0


[SPARK-20862][MLLIB][PYTHON] Avoid passing float to ndarray.reshape in 
LogisticRegressionModel

## What changes were proposed in this pull request?

Fixed TypeError with python3 and numpy 1.12.1. Numpy's `reshape` no longer 
takes floats as arguments as of 1.12. Also, python3 uses float division for 
`/`, we should be using `//` to ensure that `_dataWithBiasSize` doesn't get set 
to a float.

## How was this patch tested?

Existing tests run using python3 and numpy 1.12.

Author: Bago Amirbekian 

Closes #18081 from MrBago/BF-py3floatbug.

(cherry picked from commit bc66a77bbe2120cc21bd8da25194efca4cde13c3)
Signed-off-by: Yanbo Liang 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/83aeac9e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/83aeac9e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/83aeac9e

Branch: refs/heads/branch-2.2
Commit: 83aeac9e0590e99010d0af8e067822d0ed0971fe
Parents: 1d10724
Author: Bago Amirbekian 
Authored: Wed May 24 22:55:38 2017 +0800
Committer: Yanbo Liang 
Committed: Wed May 24 22:56:28 2017 +0800

--
 python/pyspark/mllib/classification.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/83aeac9e/python/pyspark/mllib/classification.py
--
diff --git a/python/pyspark/mllib/classification.py 
b/python/pyspark/mllib/classification.py
index 9f53ed0..e04eeb2 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -171,7 +171,7 @@ class LogisticRegressionModel(LinearClassificationModel):
 self._dataWithBiasSize = None
 self._weightsMatrix = None
 else:
-self._dataWithBiasSize = self._coeff.size / (self._numClasses - 1)
+self._dataWithBiasSize = self._coeff.size // (self._numClasses - 1)
 self._weightsMatrix = 
self._coeff.toArray().reshape(self._numClasses - 1,
 
self._dataWithBiasSize)
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20862][MLLIB][PYTHON] Avoid passing float to ndarray.reshape in LogisticRegressionModel

2017-05-24 Thread yliang
Repository: spark
Updated Branches:
  refs/heads/master 1816eb3be -> bc66a77bb


[SPARK-20862][MLLIB][PYTHON] Avoid passing float to ndarray.reshape in 
LogisticRegressionModel

## What changes were proposed in this pull request?

Fixed TypeError with python3 and numpy 1.12.1. Numpy's `reshape` no longer 
takes floats as arguments as of 1.12. Also, python3 uses float division for 
`/`, we should be using `//` to ensure that `_dataWithBiasSize` doesn't get set 
to a float.

## How was this patch tested?

Existing tests run using python3 and numpy 1.12.

Author: Bago Amirbekian 

Closes #18081 from MrBago/BF-py3floatbug.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bc66a77b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bc66a77b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bc66a77b

Branch: refs/heads/master
Commit: bc66a77bbe2120cc21bd8da25194efca4cde13c3
Parents: 1816eb3
Author: Bago Amirbekian 
Authored: Wed May 24 22:55:38 2017 +0800
Committer: Yanbo Liang 
Committed: Wed May 24 22:55:38 2017 +0800

--
 python/pyspark/mllib/classification.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/bc66a77b/python/pyspark/mllib/classification.py
--
diff --git a/python/pyspark/mllib/classification.py 
b/python/pyspark/mllib/classification.py
index 9f53ed0..e04eeb2 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -171,7 +171,7 @@ class LogisticRegressionModel(LinearClassificationModel):
 self._dataWithBiasSize = None
 self._weightsMatrix = None
 else:
-self._dataWithBiasSize = self._coeff.size / (self._numClasses - 1)
+self._dataWithBiasSize = self._coeff.size // (self._numClasses - 1)
 self._weightsMatrix = 
self._coeff.toArray().reshape(self._numClasses - 1,
 
self._dataWithBiasSize)
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20631][FOLLOW-UP] Fix incorrect tests.

2017-05-24 Thread yliang
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 e936a96ba -> 1d107242f


[SPARK-20631][FOLLOW-UP] Fix incorrect tests.

## What changes were proposed in this pull request?

- Fix incorrect tests for `_check_thresholds`.
- Move test to `ParamTests`.

## How was this patch tested?

Unit tests.

Author: zero323 

Closes #18085 from zero323/SPARK-20631-FOLLOW-UP.

(cherry picked from commit 1816eb3bef930407dc9e083de08f5105725c55d1)
Signed-off-by: Yanbo Liang 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1d107242
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1d107242
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1d107242

Branch: refs/heads/branch-2.2
Commit: 1d107242f8ec842c009e0b427f6e4a8313d99aa2
Parents: e936a96
Author: zero323 
Authored: Wed May 24 19:57:44 2017 +0800
Committer: Yanbo Liang 
Committed: Wed May 24 19:58:40 2017 +0800

--
 python/pyspark/ml/tests.py | 24 
 1 file changed, 12 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1d107242/python/pyspark/ml/tests.py
--
diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py
index a3393c6..0daf29d 100755
--- a/python/pyspark/ml/tests.py
+++ b/python/pyspark/ml/tests.py
@@ -404,6 +404,18 @@ class ParamTests(PySparkTestCase):
 self.assertEqual(tp._paramMap, copied_no_extra)
 self.assertEqual(tp._defaultParamMap, tp_copy._defaultParamMap)
 
+def test_logistic_regression_check_thresholds(self):
+self.assertIsInstance(
+LogisticRegression(threshold=0.5, thresholds=[0.5, 0.5]),
+LogisticRegression
+)
+
+self.assertRaisesRegexp(
+ValueError,
+"Logistic Regression getThreshold found inconsistent.*$",
+LogisticRegression, threshold=0.42, thresholds=[0.5, 0.5]
+)
+
 
 class EvaluatorTests(SparkSessionTestCase):
 
@@ -807,18 +819,6 @@ class PersistenceTest(SparkSessionTestCase):
 except OSError:
 pass
 
-def logistic_regression_check_thresholds(self):
-self.assertIsInstance(
-LogisticRegression(threshold=0.5, thresholds=[0.5, 0.5]),
-LogisticRegressionModel
-)
-
-self.assertRaisesRegexp(
-ValueError,
-"Logistic Regression getThreshold found inconsistent.*$",
-LogisticRegression, threshold=0.42, thresholds=[0.5, 0.5]
-)
-
 def _compare_params(self, m1, m2, param):
 """
 Compare 2 ML Params instances for the given param, and assert both 
have the same param value


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20631][FOLLOW-UP] Fix incorrect tests.

2017-05-24 Thread yliang
Repository: spark
Updated Branches:
  refs/heads/master 9afcf127d -> 1816eb3be


[SPARK-20631][FOLLOW-UP] Fix incorrect tests.

## What changes were proposed in this pull request?

- Fix incorrect tests for `_check_thresholds`.
- Move test to `ParamTests`.

## How was this patch tested?

Unit tests.

Author: zero323 

Closes #18085 from zero323/SPARK-20631-FOLLOW-UP.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1816eb3b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1816eb3b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1816eb3b

Branch: refs/heads/master
Commit: 1816eb3bef930407dc9e083de08f5105725c55d1
Parents: 9afcf12
Author: zero323 
Authored: Wed May 24 19:57:44 2017 +0800
Committer: Yanbo Liang 
Committed: Wed May 24 19:57:44 2017 +0800

--
 python/pyspark/ml/tests.py | 24 
 1 file changed, 12 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1816eb3b/python/pyspark/ml/tests.py
--
diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py
index a3393c6..0daf29d 100755
--- a/python/pyspark/ml/tests.py
+++ b/python/pyspark/ml/tests.py
@@ -404,6 +404,18 @@ class ParamTests(PySparkTestCase):
 self.assertEqual(tp._paramMap, copied_no_extra)
 self.assertEqual(tp._defaultParamMap, tp_copy._defaultParamMap)
 
+def test_logistic_regression_check_thresholds(self):
+self.assertIsInstance(
+LogisticRegression(threshold=0.5, thresholds=[0.5, 0.5]),
+LogisticRegression
+)
+
+self.assertRaisesRegexp(
+ValueError,
+"Logistic Regression getThreshold found inconsistent.*$",
+LogisticRegression, threshold=0.42, thresholds=[0.5, 0.5]
+)
+
 
 class EvaluatorTests(SparkSessionTestCase):
 
@@ -807,18 +819,6 @@ class PersistenceTest(SparkSessionTestCase):
 except OSError:
 pass
 
-def logistic_regression_check_thresholds(self):
-self.assertIsInstance(
-LogisticRegression(threshold=0.5, thresholds=[0.5, 0.5]),
-LogisticRegressionModel
-)
-
-self.assertRaisesRegexp(
-ValueError,
-"Logistic Regression getThreshold found inconsistent.*$",
-LogisticRegression, threshold=0.42, thresholds=[0.5, 0.5]
-)
-
 def _compare_params(self, m1, m2, param):
 """
 Compare 2 ML Params instances for the given param, and assert both 
have the same param value


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20764][ML][PYSPARK][FOLLOWUP] Fix visibility discrepancy with numInstances and degreesOfFreedom in LR and GLR - Python version

2017-05-24 Thread yliang
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 ee9d5975e -> e936a96ba


[SPARK-20764][ML][PYSPARK][FOLLOWUP] Fix visibility discrepancy with 
numInstances and degreesOfFreedom in LR and GLR - Python version

## What changes were proposed in this pull request?
Add test cases for PR-18062

## How was this patch tested?
The existing UT

Author: Peng 

Closes #18068 from mpjlu/moreTest.

(cherry picked from commit 9afcf127d31b5477a539dde6e5f01861532a1c4c)
Signed-off-by: Yanbo Liang 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e936a96b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e936a96b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e936a96b

Branch: refs/heads/branch-2.2
Commit: e936a96badfeeb2051ee35dc4b0fbecefa9bf4cb
Parents: ee9d597
Author: Peng 
Authored: Wed May 24 19:54:17 2017 +0800
Committer: Yanbo Liang 
Committed: Wed May 24 19:54:58 2017 +0800

--
 python/pyspark/ml/tests.py | 8 ++--
 1 file changed, 6 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e936a96b/python/pyspark/ml/tests.py
--
diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py
index 51a3e8e..a3393c6 100755
--- a/python/pyspark/ml/tests.py
+++ b/python/pyspark/ml/tests.py
@@ -1066,6 +1066,7 @@ class TrainingSummaryTest(SparkSessionTestCase):
 self.assertAlmostEqual(s.r2, 1.0, 2)
 self.assertTrue(isinstance(s.residuals, DataFrame))
 self.assertEqual(s.numInstances, 2)
+self.assertEqual(s.degreesOfFreedom, 1)
 devResiduals = s.devianceResiduals
 self.assertTrue(isinstance(devResiduals, list) and 
isinstance(devResiduals[0], float))
 coefStdErr = s.coefficientStandardErrors
@@ -1075,7 +1076,8 @@ class TrainingSummaryTest(SparkSessionTestCase):
 pValues = s.pValues
 self.assertTrue(isinstance(pValues, list) and isinstance(pValues[0], 
float))
 # test evaluation (with training dataset) produces a summary with same 
values
-# one check is enough to verify a summary is returned, Scala version 
runs full test
+# one check is enough to verify a summary is returned
+# The child class LinearRegressionTrainingSummary runs full test
 sameSummary = model.evaluate(df)
 self.assertAlmostEqual(sameSummary.explainedVariance, 
s.explainedVariance)
 
@@ -1093,6 +1095,7 @@ class TrainingSummaryTest(SparkSessionTestCase):
 self.assertEqual(s.numIterations, 1)  # this should default to a 
single iteration of WLS
 self.assertTrue(isinstance(s.predictions, DataFrame))
 self.assertEqual(s.predictionCol, "prediction")
+self.assertEqual(s.numInstances, 2)
 self.assertTrue(isinstance(s.residuals(), DataFrame))
 self.assertTrue(isinstance(s.residuals("pearson"), DataFrame))
 coefStdErr = s.coefficientStandardErrors
@@ -,7 +1114,8 @@ class TrainingSummaryTest(SparkSessionTestCase):
 self.assertTrue(isinstance(s.nullDeviance, float))
 self.assertTrue(isinstance(s.dispersion, float))
 # test evaluation (with training dataset) produces a summary with same 
values
-# one check is enough to verify a summary is returned, Scala version 
runs full test
+# one check is enough to verify a summary is returned
+# The child class GeneralizedLinearRegressionTrainingSummary runs full 
test
 sameSummary = model.evaluate(df)
 self.assertAlmostEqual(sameSummary.deviance, s.deviance)
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20764][ML][PYSPARK][FOLLOWUP] Fix visibility discrepancy with numInstances and degreesOfFreedom in LR and GLR - Python version

2017-05-24 Thread yliang
Repository: spark
Updated Branches:
  refs/heads/master d76633e3c -> 9afcf127d


[SPARK-20764][ML][PYSPARK][FOLLOWUP] Fix visibility discrepancy with 
numInstances and degreesOfFreedom in LR and GLR - Python version

## What changes were proposed in this pull request?
Add test cases for PR-18062

## How was this patch tested?
The existing UT

Author: Peng 

Closes #18068 from mpjlu/moreTest.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9afcf127
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9afcf127
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9afcf127

Branch: refs/heads/master
Commit: 9afcf127d31b5477a539dde6e5f01861532a1c4c
Parents: d76633e
Author: Peng 
Authored: Wed May 24 19:54:17 2017 +0800
Committer: Yanbo Liang 
Committed: Wed May 24 19:54:17 2017 +0800

--
 python/pyspark/ml/tests.py | 8 ++--
 1 file changed, 6 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9afcf127/python/pyspark/ml/tests.py
--
diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py
index 51a3e8e..a3393c6 100755
--- a/python/pyspark/ml/tests.py
+++ b/python/pyspark/ml/tests.py
@@ -1066,6 +1066,7 @@ class TrainingSummaryTest(SparkSessionTestCase):
 self.assertAlmostEqual(s.r2, 1.0, 2)
 self.assertTrue(isinstance(s.residuals, DataFrame))
 self.assertEqual(s.numInstances, 2)
+self.assertEqual(s.degreesOfFreedom, 1)
 devResiduals = s.devianceResiduals
 self.assertTrue(isinstance(devResiduals, list) and 
isinstance(devResiduals[0], float))
 coefStdErr = s.coefficientStandardErrors
@@ -1075,7 +1076,8 @@ class TrainingSummaryTest(SparkSessionTestCase):
 pValues = s.pValues
 self.assertTrue(isinstance(pValues, list) and isinstance(pValues[0], 
float))
 # test evaluation (with training dataset) produces a summary with same 
values
-# one check is enough to verify a summary is returned, Scala version 
runs full test
+# one check is enough to verify a summary is returned
+# The child class LinearRegressionTrainingSummary runs full test
 sameSummary = model.evaluate(df)
 self.assertAlmostEqual(sameSummary.explainedVariance, 
s.explainedVariance)
 
@@ -1093,6 +1095,7 @@ class TrainingSummaryTest(SparkSessionTestCase):
 self.assertEqual(s.numIterations, 1)  # this should default to a 
single iteration of WLS
 self.assertTrue(isinstance(s.predictions, DataFrame))
 self.assertEqual(s.predictionCol, "prediction")
+self.assertEqual(s.numInstances, 2)
 self.assertTrue(isinstance(s.residuals(), DataFrame))
 self.assertTrue(isinstance(s.residuals("pearson"), DataFrame))
 coefStdErr = s.coefficientStandardErrors
@@ -,7 +1114,8 @@ class TrainingSummaryTest(SparkSessionTestCase):
 self.assertTrue(isinstance(s.nullDeviance, float))
 self.assertTrue(isinstance(s.dispersion, float))
 # test evaluation (with training dataset) produces a summary with same 
values
-# one check is enough to verify a summary is returned, Scala version 
runs full test
+# one check is enough to verify a summary is returned
+# The child class GeneralizedLinearRegressionTrainingSummary runs full 
test
 sameSummary = model.evaluate(df)
 self.assertAlmostEqual(sameSummary.deviance, s.deviance)
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18406][CORE] Race between end-of-task and completion iterator read lock release

2017-05-24 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 9434280cf -> d76633e3c


[SPARK-18406][CORE] Race between end-of-task and completion iterator read lock 
release

## What changes were proposed in this pull request?

When a TaskContext is not propagated properly to all child threads for the 
task, just like the reported cases in this issue, we fail to get to TID from 
TaskContext and that causes unable to release the lock and assertion failures. 
To resolve this, we have to explicitly pass the TID value to the `unlock` 
method.

## How was this patch tested?

Add new failing regression test case in `RDDSuite`.

Author: Xingbo Jiang 

Closes #18076 from jiangxb1987/completion-iterator.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d76633e3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d76633e3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d76633e3

Branch: refs/heads/master
Commit: d76633e3cad341b9efa23629f33c5ce90993d6d4
Parents: 9434280
Author: Xingbo Jiang 
Authored: Wed May 24 15:43:23 2017 +0800
Committer: Wenchen Fan 
Committed: Wed May 24 15:43:23 2017 +0800

--
 .../apache/spark/network/BlockDataManager.scala |  2 +-
 .../apache/spark/storage/BlockInfoManager.scala | 15 ++
 .../org/apache/spark/storage/BlockManager.scala | 29 ++--
 .../scala/org/apache/spark/rdd/RDDSuite.scala   | 18 +++-
 4 files changed, 49 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d76633e3/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala 
b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
index 8f83668..b3f8bfe 100644
--- a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
@@ -46,5 +46,5 @@ trait BlockDataManager {
   /**
* Release locks acquired by [[putBlockData()]] and [[getBlockData()]].
*/
-  def releaseLock(blockId: BlockId): Unit
+  def releaseLock(blockId: BlockId, taskAttemptId: Option[Long]): Unit
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d76633e3/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
index 3db5983..7064872 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
@@ -281,22 +281,27 @@ private[storage] class BlockInfoManager extends Logging {
 
   /**
* Release a lock on the given block.
+   * In case a TaskContext is not propagated properly to all child threads for 
the task, we fail to
+   * get the TID from TaskContext, so we have to explicitly pass the TID value 
to release the lock.
+   *
+   * See SPARK-18406 for more discussion of this issue.
*/
-  def unlock(blockId: BlockId): Unit = synchronized {
-logTrace(s"Task $currentTaskAttemptId releasing lock for $blockId")
+  def unlock(blockId: BlockId, taskAttemptId: Option[TaskAttemptId] = None): 
Unit = synchronized {
+val taskId = taskAttemptId.getOrElse(currentTaskAttemptId)
+logTrace(s"Task $taskId releasing lock for $blockId")
 val info = get(blockId).getOrElse {
   throw new IllegalStateException(s"Block $blockId not found")
 }
 if (info.writerTask != BlockInfo.NO_WRITER) {
   info.writerTask = BlockInfo.NO_WRITER
-  writeLocksByTask.removeBinding(currentTaskAttemptId, blockId)
+  writeLocksByTask.removeBinding(taskId, blockId)
 } else {
   assert(info.readerCount > 0, s"Block $blockId is not locked for reading")
   info.readerCount -= 1
-  val countsForTask = readLocksByTask(currentTaskAttemptId)
+  val countsForTask = readLocksByTask(taskId)
   val newPinCountForTask: Int = countsForTask.remove(blockId, 1) - 1
   assert(newPinCountForTask >= 0,
-s"Task $currentTaskAttemptId release lock on block $blockId more times 
than it acquired it")
+s"Task $taskId release lock on block $blockId more times than it 
acquired it")
 }
 notifyAll()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/d76633e3/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
--
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 

spark git commit: [SPARK-18406][CORE] Race between end-of-task and completion iterator read lock release

2017-05-24 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 00dee3902 -> ee9d5975e


[SPARK-18406][CORE] Race between end-of-task and completion iterator read lock 
release

## What changes were proposed in this pull request?

When a TaskContext is not propagated properly to all child threads for the 
task, just like the reported cases in this issue, we fail to get to TID from 
TaskContext and that causes unable to release the lock and assertion failures. 
To resolve this, we have to explicitly pass the TID value to the `unlock` 
method.

## How was this patch tested?

Add new failing regression test case in `RDDSuite`.

Author: Xingbo Jiang 

Closes #18076 from jiangxb1987/completion-iterator.

(cherry picked from commit d76633e3cad341b9efa23629f33c5ce90993d6d4)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ee9d5975
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ee9d5975
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ee9d5975

Branch: refs/heads/branch-2.2
Commit: ee9d5975e6dbc5cb1dfe498870f94b1d760098db
Parents: 00dee39
Author: Xingbo Jiang 
Authored: Wed May 24 15:43:23 2017 +0800
Committer: Wenchen Fan 
Committed: Wed May 24 15:43:38 2017 +0800

--
 .../apache/spark/network/BlockDataManager.scala |  2 +-
 .../apache/spark/storage/BlockInfoManager.scala | 15 ++
 .../org/apache/spark/storage/BlockManager.scala | 29 ++--
 .../scala/org/apache/spark/rdd/RDDSuite.scala   | 18 +++-
 4 files changed, 49 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ee9d5975/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala 
b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
index 8f83668..b3f8bfe 100644
--- a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
@@ -46,5 +46,5 @@ trait BlockDataManager {
   /**
* Release locks acquired by [[putBlockData()]] and [[getBlockData()]].
*/
-  def releaseLock(blockId: BlockId): Unit
+  def releaseLock(blockId: BlockId, taskAttemptId: Option[Long]): Unit
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ee9d5975/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
index 3db5983..7064872 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
@@ -281,22 +281,27 @@ private[storage] class BlockInfoManager extends Logging {
 
   /**
* Release a lock on the given block.
+   * In case a TaskContext is not propagated properly to all child threads for 
the task, we fail to
+   * get the TID from TaskContext, so we have to explicitly pass the TID value 
to release the lock.
+   *
+   * See SPARK-18406 for more discussion of this issue.
*/
-  def unlock(blockId: BlockId): Unit = synchronized {
-logTrace(s"Task $currentTaskAttemptId releasing lock for $blockId")
+  def unlock(blockId: BlockId, taskAttemptId: Option[TaskAttemptId] = None): 
Unit = synchronized {
+val taskId = taskAttemptId.getOrElse(currentTaskAttemptId)
+logTrace(s"Task $taskId releasing lock for $blockId")
 val info = get(blockId).getOrElse {
   throw new IllegalStateException(s"Block $blockId not found")
 }
 if (info.writerTask != BlockInfo.NO_WRITER) {
   info.writerTask = BlockInfo.NO_WRITER
-  writeLocksByTask.removeBinding(currentTaskAttemptId, blockId)
+  writeLocksByTask.removeBinding(taskId, blockId)
 } else {
   assert(info.readerCount > 0, s"Block $blockId is not locked for reading")
   info.readerCount -= 1
-  val countsForTask = readLocksByTask(currentTaskAttemptId)
+  val countsForTask = readLocksByTask(taskId)
   val newPinCountForTask: Int = countsForTask.remove(blockId, 1) - 1
   assert(newPinCountForTask >= 0,
-s"Task $currentTaskAttemptId release lock on block $blockId more times 
than it acquired it")
+s"Task $taskId release lock on block $blockId more times than it 
acquired it")
 }
 notifyAll()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/ee9d5975/core/src/main/scala/org/apache/spark/storage/BlockManager.scala