spark git commit: [SPARK-20392][SQL] Set barrier to prevent re-entering a tree

2017-05-25 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master f47700c9c -> 8ce0d8ffb


[SPARK-20392][SQL] Set barrier to prevent re-entering a tree

## What changes were proposed in this pull request?

It is reported that there is performance downgrade when applying ML pipeline 
for dataset with many columns but few rows.

A big part of the performance downgrade comes from some operations (e.g., 
`select`) on DataFrame/Dataset which re-create new DataFrame/Dataset with a new 
`LogicalPlan`. The cost can be ignored in the usage of SQL, normally.

However, it's not rare to chain dozens of pipeline stages in ML. When the query 
plan grows incrementally during running those stages, the total cost spent on 
re-creation of DataFrame grows too. In particular, the `Analyzer` will go 
through the big query plan even most part of it is analyzed.

By eliminating part of the cost, the time to run the example code locally is 
reduced from about 1min to about 30 secs.

In particular, the time applying the pipeline locally is mostly spent on 
calling transform of the 137 `Bucketizer`s. Before the change, each call of 
`Bucketizer`'s transform can cost about 0.4 sec. So the total time spent on all 
`Bucketizer`s' transform is about 50 secs. After the change, each call only 
costs about 0.1 sec.

We also make `boundEnc` as lazy variable to reduce unnecessary running 
time.

### Performance improvement

The codes and datasets provided by Barry Becker to re-produce this issue and 
benchmark can be found on the JIRA.

Before this patch: about 1 min
After this patch: about 20 secs

## How was this patch tested?

Existing tests.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Liang-Chi Hsieh 

Closes #17770 from viirya/SPARK-20392.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8ce0d8ff
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8ce0d8ff
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8ce0d8ff

Branch: refs/heads/master
Commit: 8ce0d8ffb68bd9e89c23d3a026308dcc039a1b1d
Parents: f47700c
Author: Liang-Chi Hsieh 
Authored: Fri May 26 13:45:55 2017 +0800
Committer: Wenchen Fan 
Committed: Fri May 26 13:45:55 2017 +0800

--
 .../spark/sql/catalyst/analysis/Analyzer.scala  | 75 ++--
 .../catalyst/analysis/DecimalPrecision.scala|  2 +-
 .../analysis/ResolveTableValuedFunctions.scala  |  2 +-
 .../sql/catalyst/analysis/TypeCoercion.scala| 22 ++---
 .../catalyst/analysis/timeZoneAnalysis.scala|  2 +-
 .../spark/sql/catalyst/analysis/view.scala  |  2 +-
 .../spark/sql/catalyst/optimizer/subquery.scala |  2 +-
 .../catalyst/plans/logical/LogicalPlan.scala| 35 
 .../plans/logical/basicLogicalOperators.scala   |  9 ++
 .../sql/catalyst/analysis/AnalysisSuite.scala   | 14 +++
 .../sql/catalyst/plans/LogicalPlanSuite.scala   | 26 +++---
 .../scala/org/apache/spark/sql/Dataset.scala| 92 ++--
 .../sql/execution/datasources/DataSource.scala  |  2 +-
 .../spark/sql/execution/datasources/rules.scala |  2 +-
 .../spark/sql/execution/PlannerSuite.scala  |  2 +-
 .../apache/spark/sql/hive/HiveStrategies.scala  |  6 +-
 16 files changed, 151 insertions(+), 144 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8ce0d8ff/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index d130962..85cf8dd 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -166,14 +166,15 @@ class Analyzer(
 Batch("Subquery", Once,
   UpdateOuterReferences),
 Batch("Cleanup", fixedPoint,
-  CleanupAliases)
+  CleanupAliases,
+  EliminateBarriers)
   )
 
   /**
* Analyze cte definitions and substitute child plan with analyzed cte 
definitions.
*/
   object CTESubstitution extends Rule[LogicalPlan] {
-def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators  {
+def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
   case With(child, relations) =>
 substituteCTE(child, relations.foldLeft(Seq.empty[(String, 
LogicalPlan)]) {
   case (resolved, (name, relation)) =>
@@ -201,7 +202,7 @@ class Analyzer(
* Substitute child plan with WindowSpecDefinitions.
*/
   object WindowsSubstitution extends Rule[LogicalPlan] {
-def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators 

spark git commit: [SPARK-14659][ML] RFormula consistent with R when handling strings

2017-05-25 Thread yliang
Repository: spark
Updated Branches:
  refs/heads/master 2dbe0c528 -> f47700c9c


[SPARK-14659][ML] RFormula consistent with R when handling strings

## What changes were proposed in this pull request?
When handling strings, the category dropped by RFormula and R are different:
- RFormula drops the least frequent level
- R drops the first level after ascending alphabetical ordering

This PR supports different string ordering types in StringIndexer #17879 so 
that RFormula can drop the same level as R when handling strings 
using`stringOrderType = "alphabetDesc"`.

## How was this patch tested?
new tests

Author: Wayne Zhang 

Closes #17967 from actuaryzhang/RFormula.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f47700c9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f47700c9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f47700c9

Branch: refs/heads/master
Commit: f47700c9cadd72a2495f97f250790449705f631f
Parents: 2dbe0c5
Author: Wayne Zhang 
Authored: Fri May 26 10:44:40 2017 +0800
Committer: Yanbo Liang 
Committed: Fri May 26 10:44:40 2017 +0800

--
 .../org/apache/spark/ml/feature/RFormula.scala  | 44 +-
 .../apache/spark/ml/feature/StringIndexer.scala |  4 +-
 .../apache/spark/ml/feature/RFormulaSuite.scala | 84 
 3 files changed, 129 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f47700c9/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala
index 5a3e292..1fad0a6 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala
@@ -26,7 +26,7 @@ import org.apache.spark.annotation.{Experimental, Since}
 import org.apache.spark.ml.{Estimator, Model, Pipeline, PipelineModel, 
PipelineStage, Transformer}
 import org.apache.spark.ml.attribute.AttributeGroup
 import org.apache.spark.ml.linalg.VectorUDT
-import org.apache.spark.ml.param.{BooleanParam, Param, ParamMap}
+import org.apache.spark.ml.param.{BooleanParam, Param, ParamMap, 
ParamValidators}
 import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasLabelCol}
 import org.apache.spark.ml.util._
 import org.apache.spark.sql.{DataFrame, Dataset}
@@ -37,6 +37,42 @@ import org.apache.spark.sql.types._
  */
 private[feature] trait RFormulaBase extends HasFeaturesCol with HasLabelCol {
 
+  /**
+   * Param for how to order categories of a string FEATURE column used by 
`StringIndexer`.
+   * The last category after ordering is dropped when encoding strings.
+   * Supported options: 'frequencyDesc', 'frequencyAsc', 'alphabetDesc', 
'alphabetAsc'.
+   * The default value is 'frequencyDesc'. When the ordering is set to 
'alphabetDesc', `RFormula`
+   * drops the same category as R when encoding strings.
+   *
+   * The options are explained using an example `'b', 'a', 'b', 'a', 'c', 'b'`:
+   * {{{
+   * 
+-+---+--+
+   * |  Option | Category mapped to 0 by StringIndexer |  Category 
dropped by RFormula|
+   * 
+-+---+--+
+   * | 'frequencyDesc' | most frequent category ('b')  | least 
frequent category ('c')|
+   * | 'frequencyAsc'  | least frequent category ('c') | most frequent 
category ('b') |
+   * | 'alphabetDesc'  | last alphabetical category ('c')  | first 
alphabetical category ('a')|
+   * | 'alphabetAsc'   | first alphabetical category ('a') | last 
alphabetical category ('c') |
+   * 
+-+---+--+
+   * }}}
+   * Note that this ordering option is NOT used for the label column. When the 
label column is
+   * indexed, it uses the default descending frequency ordering in 
`StringIndexer`.
+   *
+   * @group param
+   */
+  @Since("2.3.0")
+  final val stringIndexerOrderType: Param[String] = new Param(this, 
"stringIndexerOrderType",
+"How to order categories of a string FEATURE column used by StringIndexer. 
" +
+"The last category after ordering is dropped when encoding strings. " +
+s"Supported options: ${StringIndexer.supportedStringOrderType.mkString(", 
")}. " +
+"The default value is 'frequencyDesc'. When the ordering is set to 
'alphabetDesc', " +
+"RFormula drops the same category as R when encoding strings.",
+

spark git commit: [SPARK-20775][SQL] Added scala support from_json

2017-05-25 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master c1e7989c4 -> 2dbe0c528


[SPARK-20775][SQL] Added scala support from_json

## What changes were proposed in this pull request?

from_json function required to take in a java.util.Hashmap. For other 
functions, a java wrapper is provided which casts a java hashmap to a scala 
map. Only a java function is provided in this case, forcing scala users to pass 
in a java.util.Hashmap.

Added the missing wrapper.

## How was this patch tested?
Added a unit test for passing in a scala map

Author: setjet 

Closes #18094 from setjet/spark-20775.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2dbe0c52
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2dbe0c52
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2dbe0c52

Branch: refs/heads/master
Commit: 2dbe0c5288b48733bae0e39a6c5d8047f4a55088
Parents: c1e7989
Author: setjet 
Authored: Fri May 26 10:21:39 2017 +0800
Committer: Wenchen Fan 
Committed: Fri May 26 10:21:39 2017 +0800

--
 .../scala/org/apache/spark/sql/functions.scala  | 22 ++--
 .../apache/spark/sql/JsonFunctionsSuite.scala   |  9 +++-
 2 files changed, 28 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2dbe0c52/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 36c0f18..7eea6d8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -3060,8 +3060,9 @@ object functions {
 from_json(e, schema, Map.empty[String, String])
 
   /**
-   * Parses a column containing a JSON string into a `StructType` or 
`ArrayType` of `StructType`s
-   * with the specified schema. Returns `null`, in the case of an unparseable 
string.
+   * (Java-specific) Parses a column containing a JSON string into a 
`StructType` or `ArrayType`
+   * of `StructType`s with the specified schema. Returns `null`, in the case 
of an unparseable
+   * string.
*
* @param e a string column containing JSON data.
* @param schema the schema to use when parsing the json string as a json 
string. In Spark 2.1,
@@ -3072,6 +3073,23 @@ object functions {
* @since 2.1.0
*/
   def from_json(e: Column, schema: String, options: java.util.Map[String, 
String]): Column = {
+from_json(e, schema, options.asScala.toMap)
+  }
+
+  /**
+   * (Scala-specific) Parses a column containing a JSON string into a 
`StructType` or `ArrayType`
+   * of `StructType`s with the specified schema. Returns `null`, in the case 
of an unparseable
+   * string.
+   *
+   * @param e a string column containing JSON data.
+   * @param schema the schema to use when parsing the json string as a json 
string. In Spark 2.1,
+   *   the user-provided schema has to be in JSON format. Since 
Spark 2.2, the DDL
+   *   format is also supported for the schema.
+   *
+   * @group collection_funcs
+   * @since 2.3.0
+   */
+  def from_json(e: Column, schema: String, options: Map[String, String]): 
Column = {
 val dataType = try {
   DataType.fromJson(schema)
 } catch {

http://git-wip-us.apache.org/repos/asf/spark/blob/2dbe0c52/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index 69a500c..cf2d00f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -156,13 +156,20 @@ class JsonFunctionsSuite extends QueryTest with 
SharedSQLContext {
   Row(Seq(Row(1, "a"), Row(2, null), Row(null, null
   }
 
-  test("from_json uses DDL strings for defining a schema") {
+  test("from_json uses DDL strings for defining a schema - java") {
 val df = Seq("""{"a": 1, "b": "haa"}""").toDS()
 checkAnswer(
   df.select(from_json($"value", "a INT, b STRING", new 
java.util.HashMap[String, String]())),
   Row(Row(1, "haa")) :: Nil)
   }
 
+  test("from_json uses DDL strings for defining a schema - scala") {
+val df = Seq("""{"a": 1, "b": "haa"}""").toDS()
+checkAnswer(
+  df.select(from_json($"value", "a INT, b STRING", Map[String, String]())),
+  Row(Row(1, "haa")) :: Nil)
+  }
+
   test("to_json - struct") {
 val df = Seq(Tuple1(Tuple1(1))).toDF("a")
 



spark git commit: [SPARK-20888][SQL][DOCS] Document change of default setting of spark.sql.hive.caseSensitiveInferenceMode

2017-05-25 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 7a21de9e2 -> 289dd170c


[SPARK-20888][SQL][DOCS] Document change of default setting of 
spark.sql.hive.caseSensitiveInferenceMode

(Link to Jira: https://issues.apache.org/jira/browse/SPARK-20888)

## What changes were proposed in this pull request?

Document change of default setting of spark.sql.hive.caseSensitiveInferenceMode 
configuration key from NEVER_INFO to INFER_AND_SAVE in the Spark SQL 2.1 to 2.2 
migration notes.

Author: Michael Allman 

Closes #18112 from mallman/spark-20888-document_infer_and_save.

(cherry picked from commit c1e7989c4ffd83c51f5c97998b4ff6fe8dd83cf4)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/289dd170
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/289dd170
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/289dd170

Branch: refs/heads/branch-2.2
Commit: 289dd170cb3e0b9eca9af5841a0155ceaffee447
Parents: 7a21de9
Author: Michael Allman 
Authored: Fri May 26 09:25:43 2017 +0800
Committer: Wenchen Fan 
Committed: Fri May 26 09:26:16 2017 +0800

--
 docs/sql-programming-guide.md | 6 +-
 1 file changed, 5 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/289dd170/docs/sql-programming-guide.md
--
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 490c1ce..adb12d2 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1223,7 +1223,7 @@ the following case-insensitive options:
  This is a JDBC writer related option. If specified, this option allows 
setting of database-specific table and partition options when creating a table 
(e.g., CREATE TABLE t (name string) ENGINE=InnoDB.). This option 
applies only to writing.

   
-  
+
   
 createTableColumnTypes
 
@@ -1444,6 +1444,10 @@ options.
 
 # Migration Guide
 
+## Upgrading From Spark SQL 2.1 to 2.2
+
+  - Spark 2.1.1 introduced a new configuration key: 
`spark.sql.hive.caseSensitiveInferenceMode`. It had a default setting of 
`NEVER_INFER`, which kept behavior identical to 2.1.0. However, Spark 2.2.0 
changes this setting's default value to `INFER_AND_SAVE` to restore 
compatibility with reading Hive metastore tables whose underlying file schema 
have mixed-case column names. With the `INFER_AND_SAVE` configuration value, on 
first access Spark will perform schema inference on any Hive metastore table 
for which it has not already saved an inferred schema. Note that schema 
inference can be a very time consuming operation for tables with thousands of 
partitions. If compatibility with mixed-case column names is not a concern, you 
can safely set `spark.sql.hive.caseSensitiveInferenceMode` to `NEVER_INFER` to 
avoid the initial overhead of schema inference. Note that with the new default 
`INFER_AND_SAVE` setting, the results of the schema inference are saved as a 
metastore key for future use
 . Therefore, the initial schema inference occurs only at a table's first 
access.
+
 ## Upgrading From Spark SQL 2.0 to 2.1
 
  - Datasource tables now store partition metadata in the Hive metastore. This 
means that Hive DDLs such as `ALTER TABLE PARTITION ... SET LOCATION` are now 
available for tables created with the Datasource API.


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20888][SQL][DOCS] Document change of default setting of spark.sql.hive.caseSensitiveInferenceMode

2017-05-25 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 98c385298 -> c1e7989c4


[SPARK-20888][SQL][DOCS] Document change of default setting of 
spark.sql.hive.caseSensitiveInferenceMode

(Link to Jira: https://issues.apache.org/jira/browse/SPARK-20888)

## What changes were proposed in this pull request?

Document change of default setting of spark.sql.hive.caseSensitiveInferenceMode 
configuration key from NEVER_INFO to INFER_AND_SAVE in the Spark SQL 2.1 to 2.2 
migration notes.

Author: Michael Allman 

Closes #18112 from mallman/spark-20888-document_infer_and_save.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c1e7989c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c1e7989c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c1e7989c

Branch: refs/heads/master
Commit: c1e7989c4ffd83c51f5c97998b4ff6fe8dd83cf4
Parents: 98c3852
Author: Michael Allman 
Authored: Fri May 26 09:25:43 2017 +0800
Committer: Wenchen Fan 
Committed: Fri May 26 09:25:43 2017 +0800

--
 docs/sql-programming-guide.md | 6 +-
 1 file changed, 5 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c1e7989c/docs/sql-programming-guide.md
--
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 490c1ce..adb12d2 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1223,7 +1223,7 @@ the following case-insensitive options:
  This is a JDBC writer related option. If specified, this option allows 
setting of database-specific table and partition options when creating a table 
(e.g., CREATE TABLE t (name string) ENGINE=InnoDB.). This option 
applies only to writing.

   
-  
+
   
 createTableColumnTypes
 
@@ -1444,6 +1444,10 @@ options.
 
 # Migration Guide
 
+## Upgrading From Spark SQL 2.1 to 2.2
+
+  - Spark 2.1.1 introduced a new configuration key: 
`spark.sql.hive.caseSensitiveInferenceMode`. It had a default setting of 
`NEVER_INFER`, which kept behavior identical to 2.1.0. However, Spark 2.2.0 
changes this setting's default value to `INFER_AND_SAVE` to restore 
compatibility with reading Hive metastore tables whose underlying file schema 
have mixed-case column names. With the `INFER_AND_SAVE` configuration value, on 
first access Spark will perform schema inference on any Hive metastore table 
for which it has not already saved an inferred schema. Note that schema 
inference can be a very time consuming operation for tables with thousands of 
partitions. If compatibility with mixed-case column names is not a concern, you 
can safely set `spark.sql.hive.caseSensitiveInferenceMode` to `NEVER_INFER` to 
avoid the initial overhead of schema inference. Note that with the new default 
`INFER_AND_SAVE` setting, the results of the schema inference are saved as a 
metastore key for future use
 . Therefore, the initial schema inference occurs only at a table's first 
access.
+
 ## Upgrading From Spark SQL 2.0 to 2.1
 
  - Datasource tables now store partition metadata in the Hive metastore. This 
means that Hive DDLs such as `ALTER TABLE PARTITION ... SET LOCATION` are now 
available for tables created with the Datasource API.


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20874][EXAMPLES] Add Structured Streaming Kafka Source to examples project

2017-05-25 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 7fc2347b5 -> 4f6fccf15


[SPARK-20874][EXAMPLES] Add Structured Streaming Kafka Source to examples 
project

## What changes were proposed in this pull request?

Add Structured Streaming Kafka Source to the `examples` project so that people 
can run `bin/run-example StructuredKafkaWordCount ...`.

## How was this patch tested?

manually tested it.

Author: Shixiong Zhu 

Closes #18101 from zsxwing/add-missing-example-dep.

(cherry picked from commit 98c3852986a2cb5f2d249d6c8ef602be283bd90e)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f6fccf1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f6fccf1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f6fccf1

Branch: refs/heads/branch-2.1
Commit: 4f6fccf15d40da503a7c6a8722058c38d57178cc
Parents: 7fc2347
Author: Shixiong Zhu 
Authored: Thu May 25 10:49:14 2017 -0700
Committer: Shixiong Zhu 
Committed: Thu May 25 10:49:32 2017 -0700

--
 examples/pom.xml | 6 ++
 1 file changed, 6 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4f6fccf1/examples/pom.xml
--
diff --git a/examples/pom.xml b/examples/pom.xml
index 8fa731f..f17e605 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -91,6 +91,12 @@
   provided
 
 
+  org.apache.spark
+  spark-sql-kafka-0-10_${scala.binary.version}
+  ${project.version}
+  provided
+
+
   org.apache.commons
   commons-math3
   provided


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20874][EXAMPLES] Add Structured Streaming Kafka Source to examples project

2017-05-25 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 5ae1c6521 -> 7a21de9e2


[SPARK-20874][EXAMPLES] Add Structured Streaming Kafka Source to examples 
project

## What changes were proposed in this pull request?

Add Structured Streaming Kafka Source to the `examples` project so that people 
can run `bin/run-example StructuredKafkaWordCount ...`.

## How was this patch tested?

manually tested it.

Author: Shixiong Zhu 

Closes #18101 from zsxwing/add-missing-example-dep.

(cherry picked from commit 98c3852986a2cb5f2d249d6c8ef602be283bd90e)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7a21de9e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7a21de9e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7a21de9e

Branch: refs/heads/branch-2.2
Commit: 7a21de9e2bb0d9344a371a8570b2fffa68c3236e
Parents: 5ae1c65
Author: Shixiong Zhu 
Authored: Thu May 25 10:49:14 2017 -0700
Committer: Shixiong Zhu 
Committed: Thu May 25 10:49:23 2017 -0700

--
 examples/pom.xml | 6 ++
 1 file changed, 6 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7a21de9e/examples/pom.xml
--
diff --git a/examples/pom.xml b/examples/pom.xml
index aa91e98..0d001ee 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -91,6 +91,12 @@
   provided
 
 
+  org.apache.spark
+  spark-sql-kafka-0-10_${scala.binary.version}
+  ${project.version}
+  provided
+
+
   org.apache.commons
   commons-math3
   provided


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20874][EXAMPLES] Add Structured Streaming Kafka Source to examples project

2017-05-25 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master e9f983df2 -> 98c385298


[SPARK-20874][EXAMPLES] Add Structured Streaming Kafka Source to examples 
project

## What changes were proposed in this pull request?

Add Structured Streaming Kafka Source to the `examples` project so that people 
can run `bin/run-example StructuredKafkaWordCount ...`.

## How was this patch tested?

manually tested it.

Author: Shixiong Zhu 

Closes #18101 from zsxwing/add-missing-example-dep.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/98c38529
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/98c38529
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/98c38529

Branch: refs/heads/master
Commit: 98c3852986a2cb5f2d249d6c8ef602be283bd90e
Parents: e9f983d
Author: Shixiong Zhu 
Authored: Thu May 25 10:49:14 2017 -0700
Committer: Shixiong Zhu 
Committed: Thu May 25 10:49:14 2017 -0700

--
 examples/pom.xml | 6 ++
 1 file changed, 6 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/98c38529/examples/pom.xml
--
diff --git a/examples/pom.xml b/examples/pom.xml
index e674e79..81af735 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -91,6 +91,12 @@
   provided
 
 
+  org.apache.spark
+  spark-sql-kafka-0-10_${scala.binary.version}
+  ${project.version}
+  provided
+
+
   org.apache.commons
   commons-math3
   provided


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19707][SPARK-18922][TESTS][SQL][CORE] Fix test failures/the invalid path check for sc.addJar on Windows

2017-05-25 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 7306d5569 -> e9f983df2


[SPARK-19707][SPARK-18922][TESTS][SQL][CORE] Fix test failures/the invalid path 
check for sc.addJar on Windows

## What changes were proposed in this pull request?

This PR proposes two things:

- A follow up for SPARK-19707 (Improving the invalid path check for sc.addJar 
on Windows as well).

```
org.apache.spark.SparkContextSuite:
 - add jar with invalid path *** FAILED *** (32 milliseconds)
   2 was not equal to 1 (SparkContextSuite.scala:309)
   ...
```

- Fix path vs URI related test failures on Windows.

```
org.apache.spark.storage.LocalDirsSuite:
 - SPARK_LOCAL_DIRS override also affects driver *** FAILED *** (0 milliseconds)
   new java.io.File("/NONEXISTENT_PATH").exists() was true 
(LocalDirsSuite.scala:50)
   ...

 - Utils.getLocalDir() throws an exception if any temporary directory cannot be 
retrieved *** FAILED *** (15 milliseconds)
   Expected exception java.io.IOException to be thrown, but no exception was 
thrown. (LocalDirsSuite.scala:64)
   ...
```

```
org.apache.spark.sql.hive.HiveSchemaInferenceSuite:
 - orc: schema should be inferred and saved when INFER_AND_SAVE is specified 
*** FAILED *** (203 milliseconds)
   java.net.URISyntaxException: Illegal character in opaque part at index 2: 
C:\projects\spark\target\tmp\spark-dae61ab3-a851-4dd3-bf4e-be97c501f254
   ...

 - parquet: schema should be inferred and saved when INFER_AND_SAVE is 
specified *** FAILED *** (203 milliseconds)
   java.net.URISyntaxException: Illegal character in opaque part at index 2: 
C:\projects\spark\target\tmp\spark-fa3aff89-a66e-4376-9a37-2a9b87596939
   ...

 - orc: schema should be inferred but not stored when INFER_ONLY is specified 
*** FAILED *** (141 milliseconds)
   java.net.URISyntaxException: Illegal character in opaque part at index 2: 
C:\projects\spark\target\tmp\spark-fb464e59-b049-481b-9c75-f53295c9fc2c
   ...

 - parquet: schema should be inferred but not stored when INFER_ONLY is 
specified *** FAILED *** (125 milliseconds)
   java.net.URISyntaxException: Illegal character in opaque part at index 2: 
C:\projects\spark\target\tmp\spark-9487568e-80a4-42b3-b0a5-d95314c4ccbc
   ...

 - orc: schema should not be inferred when NEVER_INFER is specified *** FAILED 
*** (156 milliseconds)
   java.net.URISyntaxException: Illegal character in opaque part at index 2: 
C:\projects\spark\target\tmp\spark-0d2dfa45-1b0f-4958-a8be-1074ed0135a
   ...

 - parquet: schema should not be inferred when NEVER_INFER is specified *** 
FAILED *** (547 milliseconds)
   java.net.URISyntaxException: Illegal character in opaque part at index 2: 
C:\projects\spark\target\tmp\spark-6d95d64e-613e-4a59-a0f6-d198c5aa51ee
   ...
```

```
org.apache.spark.sql.execution.command.DDLSuite:
 - create temporary view using *** FAILED *** (15 milliseconds)
   org.apache.spark.sql.AnalysisException: Path does not exist: 
file:/C:projectsspark   arget   mpspark-3881d9ca-561b-488d-90b9-97587472b853
mp;
   ...

 - insert data to a data source table which has a non-existing location should 
succeed *** FAILED *** (109 milliseconds)
   file:/C:projectsspark%09arget%09mpspark-4cad3d19-6085-4b75-b407-fe5e9d21df54 
did not equal 
file:///C:/projects/spark/target/tmp/spark-4cad3d19-6085-4b75-b407-fe5e9d21df54 
(DDLSuite.scala:1869)
   ...

 - insert into a data source table with a non-existing partition location 
should succeed *** FAILED *** (94 milliseconds)
   file:/C:projectsspark%09arget%09mpspark-4b52e7de-e3aa-42fd-95d4-6d4d58d1d95d 
did not equal 
file:///C:/projects/spark/target/tmp/spark-4b52e7de-e3aa-42fd-95d4-6d4d58d1d95d 
(DDLSuite.scala:1910)
   ...

 - read data from a data source table which has a non-existing location should 
succeed *** FAILED *** (93 milliseconds)
   file:/C:projectsspark%09arget%09mpspark-f8c281e2-08c2-4f73-abbf-f3865b702c34 
did not equal 
file:///C:/projects/spark/target/tmp/spark-f8c281e2-08c2-4f73-abbf-f3865b702c34 
(DDLSuite.scala:1937)
   ...

 - read data from a data source table with non-existing partition location 
should succeed *** FAILED *** (110 milliseconds)
   java.lang.IllegalArgumentException: Can not create a Path from an empty 
string
   ...

 - create datasource table with a non-existing location *** FAILED *** (94 
milliseconds)
   file:/C:projectsspark%09arget%09mpspark-387316ae-070c-4e78-9b78-19ebf7b29ec8 
did not equal 
file:///C:/projects/spark/target/tmp/spark-387316ae-070c-4e78-9b78-19ebf7b29ec8 
(DDLSuite.scala:1982)
   ...

 - CTAS for external data source table with a non-existing location *** FAILED 
*** (16 milliseconds)
   java.lang.IllegalArgumentException: Can not create a Path from an empty 
string
   ...

 - CTAS for external data source table with a existed location *** FAILED *** 
(15 milliseconds)
   java.lang.IllegalArgumentException: Can not create a Path from an empty 
string
   ...

 - data source table:partition column name containing a b *** FAILED *** (125 
milliseconds)
   

spark git commit: [SPARK-19707][SPARK-18922][TESTS][SQL][CORE] Fix test failures/the invalid path check for sc.addJar on Windows

2017-05-25 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 022a4957d -> 5ae1c6521


[SPARK-19707][SPARK-18922][TESTS][SQL][CORE] Fix test failures/the invalid path 
check for sc.addJar on Windows

## What changes were proposed in this pull request?

This PR proposes two things:

- A follow up for SPARK-19707 (Improving the invalid path check for sc.addJar 
on Windows as well).

```
org.apache.spark.SparkContextSuite:
 - add jar with invalid path *** FAILED *** (32 milliseconds)
   2 was not equal to 1 (SparkContextSuite.scala:309)
   ...
```

- Fix path vs URI related test failures on Windows.

```
org.apache.spark.storage.LocalDirsSuite:
 - SPARK_LOCAL_DIRS override also affects driver *** FAILED *** (0 milliseconds)
   new java.io.File("/NONEXISTENT_PATH").exists() was true 
(LocalDirsSuite.scala:50)
   ...

 - Utils.getLocalDir() throws an exception if any temporary directory cannot be 
retrieved *** FAILED *** (15 milliseconds)
   Expected exception java.io.IOException to be thrown, but no exception was 
thrown. (LocalDirsSuite.scala:64)
   ...
```

```
org.apache.spark.sql.hive.HiveSchemaInferenceSuite:
 - orc: schema should be inferred and saved when INFER_AND_SAVE is specified 
*** FAILED *** (203 milliseconds)
   java.net.URISyntaxException: Illegal character in opaque part at index 2: 
C:\projects\spark\target\tmp\spark-dae61ab3-a851-4dd3-bf4e-be97c501f254
   ...

 - parquet: schema should be inferred and saved when INFER_AND_SAVE is 
specified *** FAILED *** (203 milliseconds)
   java.net.URISyntaxException: Illegal character in opaque part at index 2: 
C:\projects\spark\target\tmp\spark-fa3aff89-a66e-4376-9a37-2a9b87596939
   ...

 - orc: schema should be inferred but not stored when INFER_ONLY is specified 
*** FAILED *** (141 milliseconds)
   java.net.URISyntaxException: Illegal character in opaque part at index 2: 
C:\projects\spark\target\tmp\spark-fb464e59-b049-481b-9c75-f53295c9fc2c
   ...

 - parquet: schema should be inferred but not stored when INFER_ONLY is 
specified *** FAILED *** (125 milliseconds)
   java.net.URISyntaxException: Illegal character in opaque part at index 2: 
C:\projects\spark\target\tmp\spark-9487568e-80a4-42b3-b0a5-d95314c4ccbc
   ...

 - orc: schema should not be inferred when NEVER_INFER is specified *** FAILED 
*** (156 milliseconds)
   java.net.URISyntaxException: Illegal character in opaque part at index 2: 
C:\projects\spark\target\tmp\spark-0d2dfa45-1b0f-4958-a8be-1074ed0135a
   ...

 - parquet: schema should not be inferred when NEVER_INFER is specified *** 
FAILED *** (547 milliseconds)
   java.net.URISyntaxException: Illegal character in opaque part at index 2: 
C:\projects\spark\target\tmp\spark-6d95d64e-613e-4a59-a0f6-d198c5aa51ee
   ...
```

```
org.apache.spark.sql.execution.command.DDLSuite:
 - create temporary view using *** FAILED *** (15 milliseconds)
   org.apache.spark.sql.AnalysisException: Path does not exist: 
file:/C:projectsspark   arget   mpspark-3881d9ca-561b-488d-90b9-97587472b853
mp;
   ...

 - insert data to a data source table which has a non-existing location should 
succeed *** FAILED *** (109 milliseconds)
   file:/C:projectsspark%09arget%09mpspark-4cad3d19-6085-4b75-b407-fe5e9d21df54 
did not equal 
file:///C:/projects/spark/target/tmp/spark-4cad3d19-6085-4b75-b407-fe5e9d21df54 
(DDLSuite.scala:1869)
   ...

 - insert into a data source table with a non-existing partition location 
should succeed *** FAILED *** (94 milliseconds)
   file:/C:projectsspark%09arget%09mpspark-4b52e7de-e3aa-42fd-95d4-6d4d58d1d95d 
did not equal 
file:///C:/projects/spark/target/tmp/spark-4b52e7de-e3aa-42fd-95d4-6d4d58d1d95d 
(DDLSuite.scala:1910)
   ...

 - read data from a data source table which has a non-existing location should 
succeed *** FAILED *** (93 milliseconds)
   file:/C:projectsspark%09arget%09mpspark-f8c281e2-08c2-4f73-abbf-f3865b702c34 
did not equal 
file:///C:/projects/spark/target/tmp/spark-f8c281e2-08c2-4f73-abbf-f3865b702c34 
(DDLSuite.scala:1937)
   ...

 - read data from a data source table with non-existing partition location 
should succeed *** FAILED *** (110 milliseconds)
   java.lang.IllegalArgumentException: Can not create a Path from an empty 
string
   ...

 - create datasource table with a non-existing location *** FAILED *** (94 
milliseconds)
   file:/C:projectsspark%09arget%09mpspark-387316ae-070c-4e78-9b78-19ebf7b29ec8 
did not equal 
file:///C:/projects/spark/target/tmp/spark-387316ae-070c-4e78-9b78-19ebf7b29ec8 
(DDLSuite.scala:1982)
   ...

 - CTAS for external data source table with a non-existing location *** FAILED 
*** (16 milliseconds)
   java.lang.IllegalArgumentException: Can not create a Path from an empty 
string
   ...

 - CTAS for external data source table with a existed location *** FAILED *** 
(15 milliseconds)
   java.lang.IllegalArgumentException: Can not create a Path from an empty 
string
   ...

 - data source table:partition column name containing a b *** FAILED *** (125 
milliseconds)
   

spark git commit: [SPARK-20741][SPARK SUBMIT] Added cleanup of JARs archive generated by SparkSubmit

2017-05-25 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 e01f1f222 -> 022a4957d


[SPARK-20741][SPARK SUBMIT] Added cleanup of JARs archive generated by 
SparkSubmit

## What changes were proposed in this pull request?

Deleted generated JARs archive after distribution to HDFS

## How was this patch tested?

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Lior Regev 

Closes #17986 from liorregev/master.

(cherry picked from commit 7306d556903c832984c7f34f1e8fe738a4b2343c)
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/022a4957
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/022a4957
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/022a4957

Branch: refs/heads/branch-2.2
Commit: 022a4957d8dc8d6049e0a8c9191fcfd1bd95a4a4
Parents: e01f1f2
Author: Lior Regev 
Authored: Thu May 25 17:08:19 2017 +0100
Committer: Sean Owen 
Committed: Thu May 25 17:08:41 2017 +0100

--
 .../yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala   | 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/022a4957/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
--
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index b817570..9956071 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -545,6 +545,7 @@ private[spark] class Client(
   distribute(jarsArchive.toURI.getPath,
 resType = LocalResourceType.ARCHIVE,
 destName = Some(LOCALIZED_LIB_DIR))
+  jarsArchive.delete()
   }
 }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20741][SPARK SUBMIT] Added cleanup of JARs archive generated by SparkSubmit

2017-05-25 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 139da116f -> 7306d5569


[SPARK-20741][SPARK SUBMIT] Added cleanup of JARs archive generated by 
SparkSubmit

## What changes were proposed in this pull request?

Deleted generated JARs archive after distribution to HDFS

## How was this patch tested?

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Lior Regev 

Closes #17986 from liorregev/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7306d556
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7306d556
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7306d556

Branch: refs/heads/master
Commit: 7306d556903c832984c7f34f1e8fe738a4b2343c
Parents: 139da11
Author: Lior Regev 
Authored: Thu May 25 17:08:19 2017 +0100
Committer: Sean Owen 
Committed: Thu May 25 17:08:19 2017 +0100

--
 .../yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala   | 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7306d556/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
--
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index b817570..9956071 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -545,6 +545,7 @@ private[spark] class Client(
   distribute(jarsArchive.toURI.getPath,
 resType = LocalResourceType.ARCHIVE,
 destName = Some(LOCALIZED_LIB_DIR))
+  jarsArchive.delete()
   }
 }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20768][PYSPARK][ML] Expose numPartitions (expert) param of PySpark FPGrowth.

2017-05-25 Thread yliang
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 9cbf39f1c -> e01f1f222


[SPARK-20768][PYSPARK][ML] Expose numPartitions (expert) param of PySpark 
FPGrowth.

## What changes were proposed in this pull request?

Expose numPartitions (expert) param of PySpark FPGrowth.

## How was this patch tested?

+ [x] Pass all unit tests.

Author: Yan Facai (颜发才) 

Closes #18058 from facaiy/ENH/pyspark_fpg_add_num_partition.

(cherry picked from commit 139da116f130ed21481d3e9bdee5df4b8d7760ac)
Signed-off-by: Yanbo Liang 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e01f1f22
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e01f1f22
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e01f1f22

Branch: refs/heads/branch-2.2
Commit: e01f1f222bcb7c469b1e1595e9338ed478d99894
Parents: 9cbf39f
Author: Yan Facai (颜发才) 
Authored: Thu May 25 21:40:39 2017 +0800
Committer: Yanbo Liang 
Committed: Thu May 25 21:40:52 2017 +0800

--
 python/pyspark/ml/fpm.py | 30 +-
 1 file changed, 29 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e01f1f22/python/pyspark/ml/fpm.py
--
diff --git a/python/pyspark/ml/fpm.py b/python/pyspark/ml/fpm.py
index 6ff7d2c..dd7dda5 100644
--- a/python/pyspark/ml/fpm.py
+++ b/python/pyspark/ml/fpm.py
@@ -49,6 +49,32 @@ class HasMinSupport(Params):
 return self.getOrDefault(self.minSupport)
 
 
+class HasNumPartitions(Params):
+"""
+Mixin for param numPartitions: Number of partitions (at least 1) used by 
parallel FP-growth.
+"""
+
+numPartitions = Param(
+Params._dummy(),
+"numPartitions",
+"Number of partitions (at least 1) used by parallel FP-growth. " +
+"By default the param is not set, " +
+"and partition number of the input dataset is used.",
+typeConverter=TypeConverters.toInt)
+
+def setNumPartitions(self, value):
+"""
+Sets the value of :py:attr:`numPartitions`.
+"""
+return self._set(numPartitions=value)
+
+def getNumPartitions(self):
+"""
+Gets the value of :py:attr:`numPartitions` or its default value.
+"""
+return self.getOrDefault(self.numPartitions)
+
+
 class HasMinConfidence(Params):
 """
 Mixin for param minConfidence.
@@ -127,7 +153,9 @@ class FPGrowthModel(JavaModel, JavaMLWritable, 
JavaMLReadable):
 
 
 class FPGrowth(JavaEstimator, HasItemsCol, HasPredictionCol,
-   HasMinSupport, HasMinConfidence, JavaMLWritable, 
JavaMLReadable):
+   HasMinSupport, HasNumPartitions, HasMinConfidence,
+   JavaMLWritable, JavaMLReadable):
+
 """
 .. note:: Experimental
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20768][PYSPARK][ML] Expose numPartitions (expert) param of PySpark FPGrowth.

2017-05-25 Thread yliang
Repository: spark
Updated Branches:
  refs/heads/master 913a6bfe4 -> 139da116f


[SPARK-20768][PYSPARK][ML] Expose numPartitions (expert) param of PySpark 
FPGrowth.

## What changes were proposed in this pull request?

Expose numPartitions (expert) param of PySpark FPGrowth.

## How was this patch tested?

+ [x] Pass all unit tests.

Author: Yan Facai (颜发才) 

Closes #18058 from facaiy/ENH/pyspark_fpg_add_num_partition.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/139da116
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/139da116
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/139da116

Branch: refs/heads/master
Commit: 139da116f130ed21481d3e9bdee5df4b8d7760ac
Parents: 913a6bf
Author: Yan Facai (颜发才) 
Authored: Thu May 25 21:40:39 2017 +0800
Committer: Yanbo Liang 
Committed: Thu May 25 21:40:39 2017 +0800

--
 python/pyspark/ml/fpm.py | 30 +-
 1 file changed, 29 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/139da116/python/pyspark/ml/fpm.py
--
diff --git a/python/pyspark/ml/fpm.py b/python/pyspark/ml/fpm.py
index 6ff7d2c..dd7dda5 100644
--- a/python/pyspark/ml/fpm.py
+++ b/python/pyspark/ml/fpm.py
@@ -49,6 +49,32 @@ class HasMinSupport(Params):
 return self.getOrDefault(self.minSupport)
 
 
+class HasNumPartitions(Params):
+"""
+Mixin for param numPartitions: Number of partitions (at least 1) used by 
parallel FP-growth.
+"""
+
+numPartitions = Param(
+Params._dummy(),
+"numPartitions",
+"Number of partitions (at least 1) used by parallel FP-growth. " +
+"By default the param is not set, " +
+"and partition number of the input dataset is used.",
+typeConverter=TypeConverters.toInt)
+
+def setNumPartitions(self, value):
+"""
+Sets the value of :py:attr:`numPartitions`.
+"""
+return self._set(numPartitions=value)
+
+def getNumPartitions(self):
+"""
+Gets the value of :py:attr:`numPartitions` or its default value.
+"""
+return self.getOrDefault(self.numPartitions)
+
+
 class HasMinConfidence(Params):
 """
 Mixin for param minConfidence.
@@ -127,7 +153,9 @@ class FPGrowthModel(JavaModel, JavaMLWritable, 
JavaMLReadable):
 
 
 class FPGrowth(JavaEstimator, HasItemsCol, HasPredictionCol,
-   HasMinSupport, HasMinConfidence, JavaMLWritable, 
JavaMLReadable):
+   HasMinSupport, HasNumPartitions, HasMinConfidence,
+   JavaMLWritable, JavaMLReadable):
+
 """
 .. note:: Experimental
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19281][FOLLOWUP][ML] Minor fix for PySpark FPGrowth.

2017-05-25 Thread yliang
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 8896c4ee9 -> 9cbf39f1c


[SPARK-19281][FOLLOWUP][ML] Minor fix for PySpark FPGrowth.

## What changes were proposed in this pull request?
Follow-up for #17218, some minor fix for PySpark ```FPGrowth```.

## How was this patch tested?
Existing UT.

Author: Yanbo Liang 

Closes #18089 from yanboliang/spark-19281.

(cherry picked from commit 913a6bfe4b0eb6b80a03b858ab4b2767194103de)
Signed-off-by: Yanbo Liang 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9cbf39f1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9cbf39f1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9cbf39f1

Branch: refs/heads/branch-2.2
Commit: 9cbf39f1c74f16483865cd93d6ffc3c521e878a7
Parents: 8896c4e
Author: Yanbo Liang 
Authored: Thu May 25 20:15:15 2017 +0800
Committer: Yanbo Liang 
Committed: Thu May 25 20:15:38 2017 +0800

--
 python/pyspark/ml/fpm.py | 21 +++--
 1 file changed, 11 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9cbf39f1/python/pyspark/ml/fpm.py
--
diff --git a/python/pyspark/ml/fpm.py b/python/pyspark/ml/fpm.py
index b30d4ed..6ff7d2c 100644
--- a/python/pyspark/ml/fpm.py
+++ b/python/pyspark/ml/fpm.py
@@ -23,17 +23,17 @@ from pyspark.ml.param.shared import *
 __all__ = ["FPGrowth", "FPGrowthModel"]
 
 
-class HasSupport(Params):
+class HasMinSupport(Params):
 """
-Mixin for param support.
+Mixin for param minSupport.
 """
 
 minSupport = Param(
 Params._dummy(),
 "minSupport",
-"""Minimal support level of the frequent pattern. [0.0, 1.0].
-Any pattern that appears more than (minSupport * size-of-the-dataset)
-times will be output""",
+"Minimal support level of the frequent pattern. [0.0, 1.0]. " +
+"Any pattern that appears more than (minSupport * size-of-the-dataset) 
" +
+"times will be output in the frequent itemsets.",
 typeConverter=TypeConverters.toFloat)
 
 def setMinSupport(self, value):
@@ -49,16 +49,17 @@ class HasSupport(Params):
 return self.getOrDefault(self.minSupport)
 
 
-class HasConfidence(Params):
+class HasMinConfidence(Params):
 """
-Mixin for param confidence.
+Mixin for param minConfidence.
 """
 
 minConfidence = Param(
 Params._dummy(),
 "minConfidence",
-"""Minimal confidence for generating Association Rule. [0.0, 1.0]
-Note that minConfidence has no effect during fitting.""",
+"Minimal confidence for generating Association Rule. [0.0, 1.0]. " +
+"minConfidence will not affect the mining for frequent itemsets, " +
+"but will affect the association rules generation.",
 typeConverter=TypeConverters.toFloat)
 
 def setMinConfidence(self, value):
@@ -126,7 +127,7 @@ class FPGrowthModel(JavaModel, JavaMLWritable, 
JavaMLReadable):
 
 
 class FPGrowth(JavaEstimator, HasItemsCol, HasPredictionCol,
-   HasSupport, HasConfidence, JavaMLWritable, JavaMLReadable):
+   HasMinSupport, HasMinConfidence, JavaMLWritable, 
JavaMLReadable):
 """
 .. note:: Experimental
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19281][FOLLOWUP][ML] Minor fix for PySpark FPGrowth.

2017-05-25 Thread yliang
Repository: spark
Updated Branches:
  refs/heads/master 3f94e64aa -> 913a6bfe4


[SPARK-19281][FOLLOWUP][ML] Minor fix for PySpark FPGrowth.

## What changes were proposed in this pull request?
Follow-up for #17218, some minor fix for PySpark ```FPGrowth```.

## How was this patch tested?
Existing UT.

Author: Yanbo Liang 

Closes #18089 from yanboliang/spark-19281.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/913a6bfe
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/913a6bfe
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/913a6bfe

Branch: refs/heads/master
Commit: 913a6bfe4b0eb6b80a03b858ab4b2767194103de
Parents: 3f94e64
Author: Yanbo Liang 
Authored: Thu May 25 20:15:15 2017 +0800
Committer: Yanbo Liang 
Committed: Thu May 25 20:15:15 2017 +0800

--
 python/pyspark/ml/fpm.py | 21 +++--
 1 file changed, 11 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/913a6bfe/python/pyspark/ml/fpm.py
--
diff --git a/python/pyspark/ml/fpm.py b/python/pyspark/ml/fpm.py
index b30d4ed..6ff7d2c 100644
--- a/python/pyspark/ml/fpm.py
+++ b/python/pyspark/ml/fpm.py
@@ -23,17 +23,17 @@ from pyspark.ml.param.shared import *
 __all__ = ["FPGrowth", "FPGrowthModel"]
 
 
-class HasSupport(Params):
+class HasMinSupport(Params):
 """
-Mixin for param support.
+Mixin for param minSupport.
 """
 
 minSupport = Param(
 Params._dummy(),
 "minSupport",
-"""Minimal support level of the frequent pattern. [0.0, 1.0].
-Any pattern that appears more than (minSupport * size-of-the-dataset)
-times will be output""",
+"Minimal support level of the frequent pattern. [0.0, 1.0]. " +
+"Any pattern that appears more than (minSupport * size-of-the-dataset) 
" +
+"times will be output in the frequent itemsets.",
 typeConverter=TypeConverters.toFloat)
 
 def setMinSupport(self, value):
@@ -49,16 +49,17 @@ class HasSupport(Params):
 return self.getOrDefault(self.minSupport)
 
 
-class HasConfidence(Params):
+class HasMinConfidence(Params):
 """
-Mixin for param confidence.
+Mixin for param minConfidence.
 """
 
 minConfidence = Param(
 Params._dummy(),
 "minConfidence",
-"""Minimal confidence for generating Association Rule. [0.0, 1.0]
-Note that minConfidence has no effect during fitting.""",
+"Minimal confidence for generating Association Rule. [0.0, 1.0]. " +
+"minConfidence will not affect the mining for frequent itemsets, " +
+"but will affect the association rules generation.",
 typeConverter=TypeConverters.toFloat)
 
 def setMinConfidence(self, value):
@@ -126,7 +127,7 @@ class FPGrowthModel(JavaModel, JavaMLWritable, 
JavaMLReadable):
 
 
 class FPGrowth(JavaEstimator, HasItemsCol, HasPredictionCol,
-   HasSupport, HasConfidence, JavaMLWritable, JavaMLReadable):
+   HasMinSupport, HasMinConfidence, JavaMLWritable, 
JavaMLReadable):
 """
 .. note:: Experimental
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19659] Fetch big blocks to disk when shuffle-read.

2017-05-25 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 b52a06d70 -> 8896c4ee9


[SPARK-19659] Fetch big blocks to disk when shuffle-read.

## What changes were proposed in this pull request?

Currently the whole block is fetched into memory(off heap by default) when 
shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can 
be large when skew situations. If OOM happens during shuffle read, job will be 
killed and users will be notified to "Consider boosting 
spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more 
memory can resolve the OOM. However the approach is not perfectly suitable for 
production environment, especially for data warehouse.
Using Spark SQL as data engine in warehouse, users hope to have a unified 
parameter(e.g. memory) but less resource wasted(resource is allocated but not 
used). The hope is strong especially when migrating data engine to Spark from 
another one(e.g. Hive). Tuning the parameter for thousands of SQLs one by one 
is very time consuming.
It's not always easy to predict skew situations, when happen, it make sense to 
fetch remote blocks to disk for shuffle-read, rather than kill the job because 
of OOM.

In this pr, I propose to fetch big blocks to disk(which is also mentioned in 
SPARK-3019):

1. Track average size and also the outliers(which are larger than 2*avgSize) in 
MapStatus;
2. Request memory from `MemoryManager` before fetch blocks and release the 
memory to `MemoryManager` when `ManagedBuffer` is released.
3. Fetch remote blocks to disk when failing acquiring memory from 
`MemoryManager`, otherwise fetch to memory.

This is an improvement for memory control when shuffle blocks and help to avoid 
OOM in scenarios like below:
1. Single huge block;
2. Sizes of many blocks are underestimated in `MapStatus` and the actual 
footprint of blocks is much larger than the estimated.

## How was this patch tested?
Added unit test in `MapStatusSuite` and `ShuffleBlockFetcherIteratorSuite`.

Author: jinxing 

Closes #16989 from jinxing64/SPARK-19659.

(cherry picked from commit 3f94e64aa8fd806ae1fa0156d846ce96afacddd3)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8896c4ee
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8896c4ee
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8896c4ee

Branch: refs/heads/branch-2.2
Commit: 8896c4ee9ea315a7dcd1a05b7201e7ad0539a5ed
Parents: b52a06d
Author: jinxing 
Authored: Thu May 25 16:11:30 2017 +0800
Committer: Wenchen Fan 
Committed: Thu May 25 16:11:51 2017 +0800

--
 .../network/server/OneForOneStreamManager.java  | 21 +
 .../network/shuffle/ExternalShuffleClient.java  |  7 +-
 .../network/shuffle/OneForOneBlockFetcher.java  | 62 +-
 .../spark/network/shuffle/ShuffleClient.java|  4 +-
 .../network/sasl/SaslIntegrationSuite.java  |  2 +-
 .../ExternalShuffleIntegrationSuite.java|  2 +-
 .../shuffle/OneForOneBlockFetcherSuite.java |  7 +-
 .../apache/spark/internal/config/package.scala  |  6 ++
 .../spark/network/BlockTransferService.scala|  7 +-
 .../netty/NettyBlockTransferService.scala   |  7 +-
 .../spark/shuffle/BlockStoreShuffleReader.scala |  3 +-
 .../storage/ShuffleBlockFetcherIterator.scala   | 71 ++--
 .../apache/spark/MapOutputTrackerSuite.scala|  2 +-
 .../netty/NettyBlockTransferSecuritySuite.scala |  2 +-
 .../spark/storage/BlockManagerSuite.scala   |  4 +-
 .../ShuffleBlockFetcherIteratorSuite.scala  | 86 ++--
 docs/configuration.md   |  8 ++
 17 files changed, 254 insertions(+), 47 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8896c4ee/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
--
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
 
b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
index ee367f9..ad8e8b4 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
@@ -23,6 +23,8 @@ import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
+import scala.Tuple2;
+
 import com.google.common.base.Preconditions;
 import io.netty.channel.Channel;
 import org.slf4j.Logger;
@@ -95,6 +97,25 @@ public class OneForOneStreamManager extends StreamManager {
   }
 
   @Override
+  

spark git commit: [SPARK-19659] Fetch big blocks to disk when shuffle-read.

2017-05-25 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 731462a04 -> 3f94e64aa


[SPARK-19659] Fetch big blocks to disk when shuffle-read.

## What changes were proposed in this pull request?

Currently the whole block is fetched into memory(off heap by default) when 
shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can 
be large when skew situations. If OOM happens during shuffle read, job will be 
killed and users will be notified to "Consider boosting 
spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more 
memory can resolve the OOM. However the approach is not perfectly suitable for 
production environment, especially for data warehouse.
Using Spark SQL as data engine in warehouse, users hope to have a unified 
parameter(e.g. memory) but less resource wasted(resource is allocated but not 
used). The hope is strong especially when migrating data engine to Spark from 
another one(e.g. Hive). Tuning the parameter for thousands of SQLs one by one 
is very time consuming.
It's not always easy to predict skew situations, when happen, it make sense to 
fetch remote blocks to disk for shuffle-read, rather than kill the job because 
of OOM.

In this pr, I propose to fetch big blocks to disk(which is also mentioned in 
SPARK-3019):

1. Track average size and also the outliers(which are larger than 2*avgSize) in 
MapStatus;
2. Request memory from `MemoryManager` before fetch blocks and release the 
memory to `MemoryManager` when `ManagedBuffer` is released.
3. Fetch remote blocks to disk when failing acquiring memory from 
`MemoryManager`, otherwise fetch to memory.

This is an improvement for memory control when shuffle blocks and help to avoid 
OOM in scenarios like below:
1. Single huge block;
2. Sizes of many blocks are underestimated in `MapStatus` and the actual 
footprint of blocks is much larger than the estimated.

## How was this patch tested?
Added unit test in `MapStatusSuite` and `ShuffleBlockFetcherIteratorSuite`.

Author: jinxing 

Closes #16989 from jinxing64/SPARK-19659.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3f94e64a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3f94e64a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3f94e64a

Branch: refs/heads/master
Commit: 3f94e64aa8fd806ae1fa0156d846ce96afacddd3
Parents: 731462a
Author: jinxing 
Authored: Thu May 25 16:11:30 2017 +0800
Committer: Wenchen Fan 
Committed: Thu May 25 16:11:30 2017 +0800

--
 .../network/server/OneForOneStreamManager.java  | 21 +
 .../network/shuffle/ExternalShuffleClient.java  |  7 +-
 .../network/shuffle/OneForOneBlockFetcher.java  | 62 +-
 .../spark/network/shuffle/ShuffleClient.java|  4 +-
 .../network/sasl/SaslIntegrationSuite.java  |  2 +-
 .../ExternalShuffleIntegrationSuite.java|  2 +-
 .../shuffle/OneForOneBlockFetcherSuite.java |  7 +-
 .../apache/spark/internal/config/package.scala  |  6 ++
 .../spark/network/BlockTransferService.scala|  7 +-
 .../netty/NettyBlockTransferService.scala   |  7 +-
 .../spark/shuffle/BlockStoreShuffleReader.scala |  3 +-
 .../storage/ShuffleBlockFetcherIterator.scala   | 71 ++--
 .../apache/spark/MapOutputTrackerSuite.scala|  2 +-
 .../netty/NettyBlockTransferSecuritySuite.scala |  2 +-
 .../spark/storage/BlockManagerSuite.scala   |  4 +-
 .../ShuffleBlockFetcherIteratorSuite.scala  | 86 ++--
 docs/configuration.md   |  8 ++
 17 files changed, 254 insertions(+), 47 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3f94e64a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
--
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
 
b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
index ee367f9..ad8e8b4 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
@@ -23,6 +23,8 @@ import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
+import scala.Tuple2;
+
 import com.google.common.base.Preconditions;
 import io.netty.channel.Channel;
 import org.slf4j.Logger;
@@ -95,6 +97,25 @@ public class OneForOneStreamManager extends StreamManager {
   }
 
   @Override
+  public ManagedBuffer openStream(String streamChunkId) {
+Tuple2 streamIdAndChunkId = 

spark git commit: [SPARK-20250][CORE] Improper OOM error when a task been killed while spilling data

2017-05-25 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 79fbfbbc7 -> ef0ebdde0


[SPARK-20250][CORE] Improper OOM error when a task been killed while spilling 
data

Currently, when a task is calling spill() but it receives a killing request 
from driver (e.g., speculative task), the `TaskMemoryManager` will throw an 
`OOM` exception.  And we don't catch `Fatal` exception when a error caused by 
`Thread.interrupt`. So for `ClosedByInterruptException`, we should throw 
`RuntimeException` instead of `OutOfMemoryError`.

https://issues.apache.org/jira/browse/SPARK-20250?jql=project%20%3D%20SPARK

Existing unit tests.

Author: Xianyang Liu 

Closes #18090 from ConeyLiu/SPARK-20250.

(cherry picked from commit 731462a04f8e33ac507ad19b4270c783a012a33e)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ef0ebdde
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ef0ebdde
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ef0ebdde

Branch: refs/heads/branch-2.0
Commit: ef0ebdde02cb130500af0ad79376563b15f921dc
Parents: 79fbfbb
Author: Xianyang Liu 
Authored: Thu May 25 15:47:59 2017 +0800
Committer: Wenchen Fan 
Committed: Thu May 25 15:52:31 2017 +0800

--
 .../java/org/apache/spark/memory/TaskMemoryManager.java | 9 +
 1 file changed, 9 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ef0ebdde/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
--
diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
index 867c4a1..23f6fd3 100644
--- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
+++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
@@ -19,6 +19,7 @@ package org.apache.spark.memory;
 
 import javax.annotation.concurrent.GuardedBy;
 import java.io.IOException;
+import java.nio.channels.ClosedByInterruptException;
 import java.util.Arrays;
 import java.util.BitSet;
 import java.util.HashSet;
@@ -156,6 +157,10 @@ public class TaskMemoryManager {
   break;
 }
   }
+} catch (ClosedByInterruptException e) {
+  // This called by user to kill a task (e.g: speculative task).
+  logger.error("error while calling spill() on " + c, e);
+  throw new RuntimeException(e.getMessage());
 } catch (IOException e) {
   logger.error("error while calling spill() on " + c, e);
   throw new OutOfMemoryError("error while calling spill() on " + c 
+ " : "
@@ -174,6 +179,10 @@ public class TaskMemoryManager {
   Utils.bytesToString(released), consumer);
 got += memoryManager.acquireExecutionMemory(required - got, 
taskAttemptId, mode);
   }
+} catch (ClosedByInterruptException e) {
+  // This called by user to kill a task (e.g: speculative task).
+  logger.error("error while calling spill() on " + consumer, e);
+  throw new RuntimeException(e.getMessage());
 } catch (IOException e) {
   logger.error("error while calling spill() on " + consumer, e);
   throw new OutOfMemoryError("error while calling spill() on " + 
consumer + " : "


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20250][CORE] Improper OOM error when a task been killed while spilling data

2017-05-25 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 7015f6f0e -> 7fc2347b5


[SPARK-20250][CORE] Improper OOM error when a task been killed while spilling 
data

Currently, when a task is calling spill() but it receives a killing request 
from driver (e.g., speculative task), the `TaskMemoryManager` will throw an 
`OOM` exception.  And we don't catch `Fatal` exception when a error caused by 
`Thread.interrupt`. So for `ClosedByInterruptException`, we should throw 
`RuntimeException` instead of `OutOfMemoryError`.

https://issues.apache.org/jira/browse/SPARK-20250?jql=project%20%3D%20SPARK

Existing unit tests.

Author: Xianyang Liu 

Closes #18090 from ConeyLiu/SPARK-20250.

(cherry picked from commit 731462a04f8e33ac507ad19b4270c783a012a33e)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7fc2347b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7fc2347b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7fc2347b

Branch: refs/heads/branch-2.1
Commit: 7fc2347b510d73fb55ab69c0579494b0761fb022
Parents: 7015f6f
Author: Xianyang Liu 
Authored: Thu May 25 15:47:59 2017 +0800
Committer: Wenchen Fan 
Committed: Thu May 25 15:51:27 2017 +0800

--
 .../java/org/apache/spark/memory/TaskMemoryManager.java | 9 +
 1 file changed, 9 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7fc2347b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
--
diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
index c40974b..3385d0e 100644
--- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
+++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
@@ -19,6 +19,7 @@ package org.apache.spark.memory;
 
 import javax.annotation.concurrent.GuardedBy;
 import java.io.IOException;
+import java.nio.channels.ClosedByInterruptException;
 import java.util.Arrays;
 import java.util.BitSet;
 import java.util.HashSet;
@@ -156,6 +157,10 @@ public class TaskMemoryManager {
   break;
 }
   }
+} catch (ClosedByInterruptException e) {
+  // This called by user to kill a task (e.g: speculative task).
+  logger.error("error while calling spill() on " + c, e);
+  throw new RuntimeException(e.getMessage());
 } catch (IOException e) {
   logger.error("error while calling spill() on " + c, e);
   throw new OutOfMemoryError("error while calling spill() on " + c 
+ " : "
@@ -174,6 +179,10 @@ public class TaskMemoryManager {
   Utils.bytesToString(released), consumer);
 got += memoryManager.acquireExecutionMemory(required - got, 
taskAttemptId, mode);
   }
+} catch (ClosedByInterruptException e) {
+  // This called by user to kill a task (e.g: speculative task).
+  logger.error("error while calling spill() on " + consumer, e);
+  throw new RuntimeException(e.getMessage());
 } catch (IOException e) {
   logger.error("error while calling spill() on " + consumer, e);
   throw new OutOfMemoryError("error while calling spill() on " + 
consumer + " : "


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20250][CORE] Improper OOM error when a task been killed while spilling data

2017-05-25 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 e0aa23939 -> b52a06d70


[SPARK-20250][CORE] Improper OOM error when a task been killed while spilling 
data

## What changes were proposed in this pull request?

Currently, when a task is calling spill() but it receives a killing request 
from driver (e.g., speculative task), the `TaskMemoryManager` will throw an 
`OOM` exception.  And we don't catch `Fatal` exception when a error caused by 
`Thread.interrupt`. So for `ClosedByInterruptException`, we should throw 
`RuntimeException` instead of `OutOfMemoryError`.

https://issues.apache.org/jira/browse/SPARK-20250?jql=project%20%3D%20SPARK

## How was this patch tested?

Existing unit tests.

Author: Xianyang Liu 

Closes #18090 from ConeyLiu/SPARK-20250.

(cherry picked from commit 731462a04f8e33ac507ad19b4270c783a012a33e)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b52a06d7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b52a06d7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b52a06d7

Branch: refs/heads/branch-2.2
Commit: b52a06d7034b3d392f7f0ee69a2fba098783e70d
Parents: e0aa239
Author: Xianyang Liu 
Authored: Thu May 25 15:47:59 2017 +0800
Committer: Wenchen Fan 
Committed: Thu May 25 15:48:16 2017 +0800

--
 .../java/org/apache/spark/memory/TaskMemoryManager.java | 9 +
 1 file changed, 9 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b52a06d7/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
--
diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
index 5f91411..761ba9d 100644
--- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
+++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
@@ -19,6 +19,7 @@ package org.apache.spark.memory;
 
 import javax.annotation.concurrent.GuardedBy;
 import java.io.IOException;
+import java.nio.channels.ClosedByInterruptException;
 import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.BitSet;
@@ -184,6 +185,10 @@ public class TaskMemoryManager {
 break;
   }
 }
+  } catch (ClosedByInterruptException e) {
+// This called by user to kill a task (e.g: speculative task).
+logger.error("error while calling spill() on " + c, e);
+throw new RuntimeException(e.getMessage());
   } catch (IOException e) {
 logger.error("error while calling spill() on " + c, e);
 throw new OutOfMemoryError("error while calling spill() on " + c + 
" : "
@@ -201,6 +206,10 @@ public class TaskMemoryManager {
   Utils.bytesToString(released), consumer);
 got += memoryManager.acquireExecutionMemory(required - got, 
taskAttemptId, mode);
   }
+} catch (ClosedByInterruptException e) {
+  // This called by user to kill a task (e.g: speculative task).
+  logger.error("error while calling spill() on " + consumer, e);
+  throw new RuntimeException(e.getMessage());
 } catch (IOException e) {
   logger.error("error while calling spill() on " + consumer, e);
   throw new OutOfMemoryError("error while calling spill() on " + 
consumer + " : "


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org