spark git commit: [SPARK-20392][SQL] Set barrier to prevent re-entering a tree
Repository: spark Updated Branches: refs/heads/master f47700c9c -> 8ce0d8ffb [SPARK-20392][SQL] Set barrier to prevent re-entering a tree ## What changes were proposed in this pull request? It is reported that there is performance downgrade when applying ML pipeline for dataset with many columns but few rows. A big part of the performance downgrade comes from some operations (e.g., `select`) on DataFrame/Dataset which re-create new DataFrame/Dataset with a new `LogicalPlan`. The cost can be ignored in the usage of SQL, normally. However, it's not rare to chain dozens of pipeline stages in ML. When the query plan grows incrementally during running those stages, the total cost spent on re-creation of DataFrame grows too. In particular, the `Analyzer` will go through the big query plan even most part of it is analyzed. By eliminating part of the cost, the time to run the example code locally is reduced from about 1min to about 30 secs. In particular, the time applying the pipeline locally is mostly spent on calling transform of the 137 `Bucketizer`s. Before the change, each call of `Bucketizer`'s transform can cost about 0.4 sec. So the total time spent on all `Bucketizer`s' transform is about 50 secs. After the change, each call only costs about 0.1 sec. We also make `boundEnc` as lazy variable to reduce unnecessary running time. ### Performance improvement The codes and datasets provided by Barry Becker to re-produce this issue and benchmark can be found on the JIRA. Before this patch: about 1 min After this patch: about 20 secs ## How was this patch tested? Existing tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi HsiehCloses #17770 from viirya/SPARK-20392. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8ce0d8ff Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8ce0d8ff Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8ce0d8ff Branch: refs/heads/master Commit: 8ce0d8ffb68bd9e89c23d3a026308dcc039a1b1d Parents: f47700c Author: Liang-Chi Hsieh Authored: Fri May 26 13:45:55 2017 +0800 Committer: Wenchen Fan Committed: Fri May 26 13:45:55 2017 +0800 -- .../spark/sql/catalyst/analysis/Analyzer.scala | 75 ++-- .../catalyst/analysis/DecimalPrecision.scala| 2 +- .../analysis/ResolveTableValuedFunctions.scala | 2 +- .../sql/catalyst/analysis/TypeCoercion.scala| 22 ++--- .../catalyst/analysis/timeZoneAnalysis.scala| 2 +- .../spark/sql/catalyst/analysis/view.scala | 2 +- .../spark/sql/catalyst/optimizer/subquery.scala | 2 +- .../catalyst/plans/logical/LogicalPlan.scala| 35 .../plans/logical/basicLogicalOperators.scala | 9 ++ .../sql/catalyst/analysis/AnalysisSuite.scala | 14 +++ .../sql/catalyst/plans/LogicalPlanSuite.scala | 26 +++--- .../scala/org/apache/spark/sql/Dataset.scala| 92 ++-- .../sql/execution/datasources/DataSource.scala | 2 +- .../spark/sql/execution/datasources/rules.scala | 2 +- .../spark/sql/execution/PlannerSuite.scala | 2 +- .../apache/spark/sql/hive/HiveStrategies.scala | 6 +- 16 files changed, 151 insertions(+), 144 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8ce0d8ff/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index d130962..85cf8dd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -166,14 +166,15 @@ class Analyzer( Batch("Subquery", Once, UpdateOuterReferences), Batch("Cleanup", fixedPoint, - CleanupAliases) + CleanupAliases, + EliminateBarriers) ) /** * Analyze cte definitions and substitute child plan with analyzed cte definitions. */ object CTESubstitution extends Rule[LogicalPlan] { -def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { +def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp { case With(child, relations) => substituteCTE(child, relations.foldLeft(Seq.empty[(String, LogicalPlan)]) { case (resolved, (name, relation)) => @@ -201,7 +202,7 @@ class Analyzer( * Substitute child plan with WindowSpecDefinitions. */ object WindowsSubstitution extends Rule[LogicalPlan] { -def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators
spark git commit: [SPARK-14659][ML] RFormula consistent with R when handling strings
Repository: spark Updated Branches: refs/heads/master 2dbe0c528 -> f47700c9c [SPARK-14659][ML] RFormula consistent with R when handling strings ## What changes were proposed in this pull request? When handling strings, the category dropped by RFormula and R are different: - RFormula drops the least frequent level - R drops the first level after ascending alphabetical ordering This PR supports different string ordering types in StringIndexer #17879 so that RFormula can drop the same level as R when handling strings using`stringOrderType = "alphabetDesc"`. ## How was this patch tested? new tests Author: Wayne ZhangCloses #17967 from actuaryzhang/RFormula. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f47700c9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f47700c9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f47700c9 Branch: refs/heads/master Commit: f47700c9cadd72a2495f97f250790449705f631f Parents: 2dbe0c5 Author: Wayne Zhang Authored: Fri May 26 10:44:40 2017 +0800 Committer: Yanbo Liang Committed: Fri May 26 10:44:40 2017 +0800 -- .../org/apache/spark/ml/feature/RFormula.scala | 44 +- .../apache/spark/ml/feature/StringIndexer.scala | 4 +- .../apache/spark/ml/feature/RFormulaSuite.scala | 84 3 files changed, 129 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f47700c9/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala index 5a3e292..1fad0a6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala @@ -26,7 +26,7 @@ import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.ml.{Estimator, Model, Pipeline, PipelineModel, PipelineStage, Transformer} import org.apache.spark.ml.attribute.AttributeGroup import org.apache.spark.ml.linalg.VectorUDT -import org.apache.spark.ml.param.{BooleanParam, Param, ParamMap} +import org.apache.spark.ml.param.{BooleanParam, Param, ParamMap, ParamValidators} import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasLabelCol} import org.apache.spark.ml.util._ import org.apache.spark.sql.{DataFrame, Dataset} @@ -37,6 +37,42 @@ import org.apache.spark.sql.types._ */ private[feature] trait RFormulaBase extends HasFeaturesCol with HasLabelCol { + /** + * Param for how to order categories of a string FEATURE column used by `StringIndexer`. + * The last category after ordering is dropped when encoding strings. + * Supported options: 'frequencyDesc', 'frequencyAsc', 'alphabetDesc', 'alphabetAsc'. + * The default value is 'frequencyDesc'. When the ordering is set to 'alphabetDesc', `RFormula` + * drops the same category as R when encoding strings. + * + * The options are explained using an example `'b', 'a', 'b', 'a', 'c', 'b'`: + * {{{ + * +-+---+--+ + * | Option | Category mapped to 0 by StringIndexer | Category dropped by RFormula| + * +-+---+--+ + * | 'frequencyDesc' | most frequent category ('b') | least frequent category ('c')| + * | 'frequencyAsc' | least frequent category ('c') | most frequent category ('b') | + * | 'alphabetDesc' | last alphabetical category ('c') | first alphabetical category ('a')| + * | 'alphabetAsc' | first alphabetical category ('a') | last alphabetical category ('c') | + * +-+---+--+ + * }}} + * Note that this ordering option is NOT used for the label column. When the label column is + * indexed, it uses the default descending frequency ordering in `StringIndexer`. + * + * @group param + */ + @Since("2.3.0") + final val stringIndexerOrderType: Param[String] = new Param(this, "stringIndexerOrderType", +"How to order categories of a string FEATURE column used by StringIndexer. " + +"The last category after ordering is dropped when encoding strings. " + +s"Supported options: ${StringIndexer.supportedStringOrderType.mkString(", ")}. " + +"The default value is 'frequencyDesc'. When the ordering is set to 'alphabetDesc', " + +"RFormula drops the same category as R when encoding strings.", +
spark git commit: [SPARK-20775][SQL] Added scala support from_json
Repository: spark Updated Branches: refs/heads/master c1e7989c4 -> 2dbe0c528 [SPARK-20775][SQL] Added scala support from_json ## What changes were proposed in this pull request? from_json function required to take in a java.util.Hashmap. For other functions, a java wrapper is provided which casts a java hashmap to a scala map. Only a java function is provided in this case, forcing scala users to pass in a java.util.Hashmap. Added the missing wrapper. ## How was this patch tested? Added a unit test for passing in a scala map Author: setjetCloses #18094 from setjet/spark-20775. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2dbe0c52 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2dbe0c52 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2dbe0c52 Branch: refs/heads/master Commit: 2dbe0c5288b48733bae0e39a6c5d8047f4a55088 Parents: c1e7989 Author: setjet Authored: Fri May 26 10:21:39 2017 +0800 Committer: Wenchen Fan Committed: Fri May 26 10:21:39 2017 +0800 -- .../scala/org/apache/spark/sql/functions.scala | 22 ++-- .../apache/spark/sql/JsonFunctionsSuite.scala | 9 +++- 2 files changed, 28 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2dbe0c52/sql/core/src/main/scala/org/apache/spark/sql/functions.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 36c0f18..7eea6d8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -3060,8 +3060,9 @@ object functions { from_json(e, schema, Map.empty[String, String]) /** - * Parses a column containing a JSON string into a `StructType` or `ArrayType` of `StructType`s - * with the specified schema. Returns `null`, in the case of an unparseable string. + * (Java-specific) Parses a column containing a JSON string into a `StructType` or `ArrayType` + * of `StructType`s with the specified schema. Returns `null`, in the case of an unparseable + * string. * * @param e a string column containing JSON data. * @param schema the schema to use when parsing the json string as a json string. In Spark 2.1, @@ -3072,6 +3073,23 @@ object functions { * @since 2.1.0 */ def from_json(e: Column, schema: String, options: java.util.Map[String, String]): Column = { +from_json(e, schema, options.asScala.toMap) + } + + /** + * (Scala-specific) Parses a column containing a JSON string into a `StructType` or `ArrayType` + * of `StructType`s with the specified schema. Returns `null`, in the case of an unparseable + * string. + * + * @param e a string column containing JSON data. + * @param schema the schema to use when parsing the json string as a json string. In Spark 2.1, + * the user-provided schema has to be in JSON format. Since Spark 2.2, the DDL + * format is also supported for the schema. + * + * @group collection_funcs + * @since 2.3.0 + */ + def from_json(e: Column, schema: String, options: Map[String, String]): Column = { val dataType = try { DataType.fromJson(schema) } catch { http://git-wip-us.apache.org/repos/asf/spark/blob/2dbe0c52/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index 69a500c..cf2d00f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -156,13 +156,20 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { Row(Seq(Row(1, "a"), Row(2, null), Row(null, null } - test("from_json uses DDL strings for defining a schema") { + test("from_json uses DDL strings for defining a schema - java") { val df = Seq("""{"a": 1, "b": "haa"}""").toDS() checkAnswer( df.select(from_json($"value", "a INT, b STRING", new java.util.HashMap[String, String]())), Row(Row(1, "haa")) :: Nil) } + test("from_json uses DDL strings for defining a schema - scala") { +val df = Seq("""{"a": 1, "b": "haa"}""").toDS() +checkAnswer( + df.select(from_json($"value", "a INT, b STRING", Map[String, String]())), + Row(Row(1, "haa")) :: Nil) + } + test("to_json - struct") { val df = Seq(Tuple1(Tuple1(1))).toDF("a")
spark git commit: [SPARK-20888][SQL][DOCS] Document change of default setting of spark.sql.hive.caseSensitiveInferenceMode
Repository: spark Updated Branches: refs/heads/branch-2.2 7a21de9e2 -> 289dd170c [SPARK-20888][SQL][DOCS] Document change of default setting of spark.sql.hive.caseSensitiveInferenceMode (Link to Jira: https://issues.apache.org/jira/browse/SPARK-20888) ## What changes were proposed in this pull request? Document change of default setting of spark.sql.hive.caseSensitiveInferenceMode configuration key from NEVER_INFO to INFER_AND_SAVE in the Spark SQL 2.1 to 2.2 migration notes. Author: Michael AllmanCloses #18112 from mallman/spark-20888-document_infer_and_save. (cherry picked from commit c1e7989c4ffd83c51f5c97998b4ff6fe8dd83cf4) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/289dd170 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/289dd170 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/289dd170 Branch: refs/heads/branch-2.2 Commit: 289dd170cb3e0b9eca9af5841a0155ceaffee447 Parents: 7a21de9 Author: Michael Allman Authored: Fri May 26 09:25:43 2017 +0800 Committer: Wenchen Fan Committed: Fri May 26 09:26:16 2017 +0800 -- docs/sql-programming-guide.md | 6 +- 1 file changed, 5 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/289dd170/docs/sql-programming-guide.md -- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 490c1ce..adb12d2 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1223,7 +1223,7 @@ the following case-insensitive options: This is a JDBC writer related option. If specified, this option allows setting of database-specific table and partition options when creating a table (e.g., CREATE TABLE t (name string) ENGINE=InnoDB.). This option applies only to writing. - + createTableColumnTypes @@ -1444,6 +1444,10 @@ options. # Migration Guide +## Upgrading From Spark SQL 2.1 to 2.2 + + - Spark 2.1.1 introduced a new configuration key: `spark.sql.hive.caseSensitiveInferenceMode`. It had a default setting of `NEVER_INFER`, which kept behavior identical to 2.1.0. However, Spark 2.2.0 changes this setting's default value to `INFER_AND_SAVE` to restore compatibility with reading Hive metastore tables whose underlying file schema have mixed-case column names. With the `INFER_AND_SAVE` configuration value, on first access Spark will perform schema inference on any Hive metastore table for which it has not already saved an inferred schema. Note that schema inference can be a very time consuming operation for tables with thousands of partitions. If compatibility with mixed-case column names is not a concern, you can safely set `spark.sql.hive.caseSensitiveInferenceMode` to `NEVER_INFER` to avoid the initial overhead of schema inference. Note that with the new default `INFER_AND_SAVE` setting, the results of the schema inference are saved as a metastore key for future use . Therefore, the initial schema inference occurs only at a table's first access. + ## Upgrading From Spark SQL 2.0 to 2.1 - Datasource tables now store partition metadata in the Hive metastore. This means that Hive DDLs such as `ALTER TABLE PARTITION ... SET LOCATION` are now available for tables created with the Datasource API. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20888][SQL][DOCS] Document change of default setting of spark.sql.hive.caseSensitiveInferenceMode
Repository: spark Updated Branches: refs/heads/master 98c385298 -> c1e7989c4 [SPARK-20888][SQL][DOCS] Document change of default setting of spark.sql.hive.caseSensitiveInferenceMode (Link to Jira: https://issues.apache.org/jira/browse/SPARK-20888) ## What changes were proposed in this pull request? Document change of default setting of spark.sql.hive.caseSensitiveInferenceMode configuration key from NEVER_INFO to INFER_AND_SAVE in the Spark SQL 2.1 to 2.2 migration notes. Author: Michael AllmanCloses #18112 from mallman/spark-20888-document_infer_and_save. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c1e7989c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c1e7989c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c1e7989c Branch: refs/heads/master Commit: c1e7989c4ffd83c51f5c97998b4ff6fe8dd83cf4 Parents: 98c3852 Author: Michael Allman Authored: Fri May 26 09:25:43 2017 +0800 Committer: Wenchen Fan Committed: Fri May 26 09:25:43 2017 +0800 -- docs/sql-programming-guide.md | 6 +- 1 file changed, 5 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c1e7989c/docs/sql-programming-guide.md -- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 490c1ce..adb12d2 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1223,7 +1223,7 @@ the following case-insensitive options: This is a JDBC writer related option. If specified, this option allows setting of database-specific table and partition options when creating a table (e.g., CREATE TABLE t (name string) ENGINE=InnoDB.). This option applies only to writing. - + createTableColumnTypes @@ -1444,6 +1444,10 @@ options. # Migration Guide +## Upgrading From Spark SQL 2.1 to 2.2 + + - Spark 2.1.1 introduced a new configuration key: `spark.sql.hive.caseSensitiveInferenceMode`. It had a default setting of `NEVER_INFER`, which kept behavior identical to 2.1.0. However, Spark 2.2.0 changes this setting's default value to `INFER_AND_SAVE` to restore compatibility with reading Hive metastore tables whose underlying file schema have mixed-case column names. With the `INFER_AND_SAVE` configuration value, on first access Spark will perform schema inference on any Hive metastore table for which it has not already saved an inferred schema. Note that schema inference can be a very time consuming operation for tables with thousands of partitions. If compatibility with mixed-case column names is not a concern, you can safely set `spark.sql.hive.caseSensitiveInferenceMode` to `NEVER_INFER` to avoid the initial overhead of schema inference. Note that with the new default `INFER_AND_SAVE` setting, the results of the schema inference are saved as a metastore key for future use . Therefore, the initial schema inference occurs only at a table's first access. + ## Upgrading From Spark SQL 2.0 to 2.1 - Datasource tables now store partition metadata in the Hive metastore. This means that Hive DDLs such as `ALTER TABLE PARTITION ... SET LOCATION` are now available for tables created with the Datasource API. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20874][EXAMPLES] Add Structured Streaming Kafka Source to examples project
Repository: spark Updated Branches: refs/heads/branch-2.1 7fc2347b5 -> 4f6fccf15 [SPARK-20874][EXAMPLES] Add Structured Streaming Kafka Source to examples project ## What changes were proposed in this pull request? Add Structured Streaming Kafka Source to the `examples` project so that people can run `bin/run-example StructuredKafkaWordCount ...`. ## How was this patch tested? manually tested it. Author: Shixiong ZhuCloses #18101 from zsxwing/add-missing-example-dep. (cherry picked from commit 98c3852986a2cb5f2d249d6c8ef602be283bd90e) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f6fccf1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f6fccf1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f6fccf1 Branch: refs/heads/branch-2.1 Commit: 4f6fccf15d40da503a7c6a8722058c38d57178cc Parents: 7fc2347 Author: Shixiong Zhu Authored: Thu May 25 10:49:14 2017 -0700 Committer: Shixiong Zhu Committed: Thu May 25 10:49:32 2017 -0700 -- examples/pom.xml | 6 ++ 1 file changed, 6 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4f6fccf1/examples/pom.xml -- diff --git a/examples/pom.xml b/examples/pom.xml index 8fa731f..f17e605 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -91,6 +91,12 @@ provided + org.apache.spark + spark-sql-kafka-0-10_${scala.binary.version} + ${project.version} + provided + + org.apache.commons commons-math3 provided - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20874][EXAMPLES] Add Structured Streaming Kafka Source to examples project
Repository: spark Updated Branches: refs/heads/branch-2.2 5ae1c6521 -> 7a21de9e2 [SPARK-20874][EXAMPLES] Add Structured Streaming Kafka Source to examples project ## What changes were proposed in this pull request? Add Structured Streaming Kafka Source to the `examples` project so that people can run `bin/run-example StructuredKafkaWordCount ...`. ## How was this patch tested? manually tested it. Author: Shixiong ZhuCloses #18101 from zsxwing/add-missing-example-dep. (cherry picked from commit 98c3852986a2cb5f2d249d6c8ef602be283bd90e) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7a21de9e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7a21de9e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7a21de9e Branch: refs/heads/branch-2.2 Commit: 7a21de9e2bb0d9344a371a8570b2fffa68c3236e Parents: 5ae1c65 Author: Shixiong Zhu Authored: Thu May 25 10:49:14 2017 -0700 Committer: Shixiong Zhu Committed: Thu May 25 10:49:23 2017 -0700 -- examples/pom.xml | 6 ++ 1 file changed, 6 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7a21de9e/examples/pom.xml -- diff --git a/examples/pom.xml b/examples/pom.xml index aa91e98..0d001ee 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -91,6 +91,12 @@ provided + org.apache.spark + spark-sql-kafka-0-10_${scala.binary.version} + ${project.version} + provided + + org.apache.commons commons-math3 provided - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20874][EXAMPLES] Add Structured Streaming Kafka Source to examples project
Repository: spark Updated Branches: refs/heads/master e9f983df2 -> 98c385298 [SPARK-20874][EXAMPLES] Add Structured Streaming Kafka Source to examples project ## What changes were proposed in this pull request? Add Structured Streaming Kafka Source to the `examples` project so that people can run `bin/run-example StructuredKafkaWordCount ...`. ## How was this patch tested? manually tested it. Author: Shixiong ZhuCloses #18101 from zsxwing/add-missing-example-dep. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/98c38529 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/98c38529 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/98c38529 Branch: refs/heads/master Commit: 98c3852986a2cb5f2d249d6c8ef602be283bd90e Parents: e9f983d Author: Shixiong Zhu Authored: Thu May 25 10:49:14 2017 -0700 Committer: Shixiong Zhu Committed: Thu May 25 10:49:14 2017 -0700 -- examples/pom.xml | 6 ++ 1 file changed, 6 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/98c38529/examples/pom.xml -- diff --git a/examples/pom.xml b/examples/pom.xml index e674e79..81af735 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -91,6 +91,12 @@ provided + org.apache.spark + spark-sql-kafka-0-10_${scala.binary.version} + ${project.version} + provided + + org.apache.commons commons-math3 provided - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-19707][SPARK-18922][TESTS][SQL][CORE] Fix test failures/the invalid path check for sc.addJar on Windows
Repository: spark Updated Branches: refs/heads/master 7306d5569 -> e9f983df2 [SPARK-19707][SPARK-18922][TESTS][SQL][CORE] Fix test failures/the invalid path check for sc.addJar on Windows ## What changes were proposed in this pull request? This PR proposes two things: - A follow up for SPARK-19707 (Improving the invalid path check for sc.addJar on Windows as well). ``` org.apache.spark.SparkContextSuite: - add jar with invalid path *** FAILED *** (32 milliseconds) 2 was not equal to 1 (SparkContextSuite.scala:309) ... ``` - Fix path vs URI related test failures on Windows. ``` org.apache.spark.storage.LocalDirsSuite: - SPARK_LOCAL_DIRS override also affects driver *** FAILED *** (0 milliseconds) new java.io.File("/NONEXISTENT_PATH").exists() was true (LocalDirsSuite.scala:50) ... - Utils.getLocalDir() throws an exception if any temporary directory cannot be retrieved *** FAILED *** (15 milliseconds) Expected exception java.io.IOException to be thrown, but no exception was thrown. (LocalDirsSuite.scala:64) ... ``` ``` org.apache.spark.sql.hive.HiveSchemaInferenceSuite: - orc: schema should be inferred and saved when INFER_AND_SAVE is specified *** FAILED *** (203 milliseconds) java.net.URISyntaxException: Illegal character in opaque part at index 2: C:\projects\spark\target\tmp\spark-dae61ab3-a851-4dd3-bf4e-be97c501f254 ... - parquet: schema should be inferred and saved when INFER_AND_SAVE is specified *** FAILED *** (203 milliseconds) java.net.URISyntaxException: Illegal character in opaque part at index 2: C:\projects\spark\target\tmp\spark-fa3aff89-a66e-4376-9a37-2a9b87596939 ... - orc: schema should be inferred but not stored when INFER_ONLY is specified *** FAILED *** (141 milliseconds) java.net.URISyntaxException: Illegal character in opaque part at index 2: C:\projects\spark\target\tmp\spark-fb464e59-b049-481b-9c75-f53295c9fc2c ... - parquet: schema should be inferred but not stored when INFER_ONLY is specified *** FAILED *** (125 milliseconds) java.net.URISyntaxException: Illegal character in opaque part at index 2: C:\projects\spark\target\tmp\spark-9487568e-80a4-42b3-b0a5-d95314c4ccbc ... - orc: schema should not be inferred when NEVER_INFER is specified *** FAILED *** (156 milliseconds) java.net.URISyntaxException: Illegal character in opaque part at index 2: C:\projects\spark\target\tmp\spark-0d2dfa45-1b0f-4958-a8be-1074ed0135a ... - parquet: schema should not be inferred when NEVER_INFER is specified *** FAILED *** (547 milliseconds) java.net.URISyntaxException: Illegal character in opaque part at index 2: C:\projects\spark\target\tmp\spark-6d95d64e-613e-4a59-a0f6-d198c5aa51ee ... ``` ``` org.apache.spark.sql.execution.command.DDLSuite: - create temporary view using *** FAILED *** (15 milliseconds) org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark arget mpspark-3881d9ca-561b-488d-90b9-97587472b853 mp; ... - insert data to a data source table which has a non-existing location should succeed *** FAILED *** (109 milliseconds) file:/C:projectsspark%09arget%09mpspark-4cad3d19-6085-4b75-b407-fe5e9d21df54 did not equal file:///C:/projects/spark/target/tmp/spark-4cad3d19-6085-4b75-b407-fe5e9d21df54 (DDLSuite.scala:1869) ... - insert into a data source table with a non-existing partition location should succeed *** FAILED *** (94 milliseconds) file:/C:projectsspark%09arget%09mpspark-4b52e7de-e3aa-42fd-95d4-6d4d58d1d95d did not equal file:///C:/projects/spark/target/tmp/spark-4b52e7de-e3aa-42fd-95d4-6d4d58d1d95d (DDLSuite.scala:1910) ... - read data from a data source table which has a non-existing location should succeed *** FAILED *** (93 milliseconds) file:/C:projectsspark%09arget%09mpspark-f8c281e2-08c2-4f73-abbf-f3865b702c34 did not equal file:///C:/projects/spark/target/tmp/spark-f8c281e2-08c2-4f73-abbf-f3865b702c34 (DDLSuite.scala:1937) ... - read data from a data source table with non-existing partition location should succeed *** FAILED *** (110 milliseconds) java.lang.IllegalArgumentException: Can not create a Path from an empty string ... - create datasource table with a non-existing location *** FAILED *** (94 milliseconds) file:/C:projectsspark%09arget%09mpspark-387316ae-070c-4e78-9b78-19ebf7b29ec8 did not equal file:///C:/projects/spark/target/tmp/spark-387316ae-070c-4e78-9b78-19ebf7b29ec8 (DDLSuite.scala:1982) ... - CTAS for external data source table with a non-existing location *** FAILED *** (16 milliseconds) java.lang.IllegalArgumentException: Can not create a Path from an empty string ... - CTAS for external data source table with a existed location *** FAILED *** (15 milliseconds) java.lang.IllegalArgumentException: Can not create a Path from an empty string ... - data source table:partition column name containing a b *** FAILED *** (125 milliseconds)
spark git commit: [SPARK-19707][SPARK-18922][TESTS][SQL][CORE] Fix test failures/the invalid path check for sc.addJar on Windows
Repository: spark Updated Branches: refs/heads/branch-2.2 022a4957d -> 5ae1c6521 [SPARK-19707][SPARK-18922][TESTS][SQL][CORE] Fix test failures/the invalid path check for sc.addJar on Windows ## What changes were proposed in this pull request? This PR proposes two things: - A follow up for SPARK-19707 (Improving the invalid path check for sc.addJar on Windows as well). ``` org.apache.spark.SparkContextSuite: - add jar with invalid path *** FAILED *** (32 milliseconds) 2 was not equal to 1 (SparkContextSuite.scala:309) ... ``` - Fix path vs URI related test failures on Windows. ``` org.apache.spark.storage.LocalDirsSuite: - SPARK_LOCAL_DIRS override also affects driver *** FAILED *** (0 milliseconds) new java.io.File("/NONEXISTENT_PATH").exists() was true (LocalDirsSuite.scala:50) ... - Utils.getLocalDir() throws an exception if any temporary directory cannot be retrieved *** FAILED *** (15 milliseconds) Expected exception java.io.IOException to be thrown, but no exception was thrown. (LocalDirsSuite.scala:64) ... ``` ``` org.apache.spark.sql.hive.HiveSchemaInferenceSuite: - orc: schema should be inferred and saved when INFER_AND_SAVE is specified *** FAILED *** (203 milliseconds) java.net.URISyntaxException: Illegal character in opaque part at index 2: C:\projects\spark\target\tmp\spark-dae61ab3-a851-4dd3-bf4e-be97c501f254 ... - parquet: schema should be inferred and saved when INFER_AND_SAVE is specified *** FAILED *** (203 milliseconds) java.net.URISyntaxException: Illegal character in opaque part at index 2: C:\projects\spark\target\tmp\spark-fa3aff89-a66e-4376-9a37-2a9b87596939 ... - orc: schema should be inferred but not stored when INFER_ONLY is specified *** FAILED *** (141 milliseconds) java.net.URISyntaxException: Illegal character in opaque part at index 2: C:\projects\spark\target\tmp\spark-fb464e59-b049-481b-9c75-f53295c9fc2c ... - parquet: schema should be inferred but not stored when INFER_ONLY is specified *** FAILED *** (125 milliseconds) java.net.URISyntaxException: Illegal character in opaque part at index 2: C:\projects\spark\target\tmp\spark-9487568e-80a4-42b3-b0a5-d95314c4ccbc ... - orc: schema should not be inferred when NEVER_INFER is specified *** FAILED *** (156 milliseconds) java.net.URISyntaxException: Illegal character in opaque part at index 2: C:\projects\spark\target\tmp\spark-0d2dfa45-1b0f-4958-a8be-1074ed0135a ... - parquet: schema should not be inferred when NEVER_INFER is specified *** FAILED *** (547 milliseconds) java.net.URISyntaxException: Illegal character in opaque part at index 2: C:\projects\spark\target\tmp\spark-6d95d64e-613e-4a59-a0f6-d198c5aa51ee ... ``` ``` org.apache.spark.sql.execution.command.DDLSuite: - create temporary view using *** FAILED *** (15 milliseconds) org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark arget mpspark-3881d9ca-561b-488d-90b9-97587472b853 mp; ... - insert data to a data source table which has a non-existing location should succeed *** FAILED *** (109 milliseconds) file:/C:projectsspark%09arget%09mpspark-4cad3d19-6085-4b75-b407-fe5e9d21df54 did not equal file:///C:/projects/spark/target/tmp/spark-4cad3d19-6085-4b75-b407-fe5e9d21df54 (DDLSuite.scala:1869) ... - insert into a data source table with a non-existing partition location should succeed *** FAILED *** (94 milliseconds) file:/C:projectsspark%09arget%09mpspark-4b52e7de-e3aa-42fd-95d4-6d4d58d1d95d did not equal file:///C:/projects/spark/target/tmp/spark-4b52e7de-e3aa-42fd-95d4-6d4d58d1d95d (DDLSuite.scala:1910) ... - read data from a data source table which has a non-existing location should succeed *** FAILED *** (93 milliseconds) file:/C:projectsspark%09arget%09mpspark-f8c281e2-08c2-4f73-abbf-f3865b702c34 did not equal file:///C:/projects/spark/target/tmp/spark-f8c281e2-08c2-4f73-abbf-f3865b702c34 (DDLSuite.scala:1937) ... - read data from a data source table with non-existing partition location should succeed *** FAILED *** (110 milliseconds) java.lang.IllegalArgumentException: Can not create a Path from an empty string ... - create datasource table with a non-existing location *** FAILED *** (94 milliseconds) file:/C:projectsspark%09arget%09mpspark-387316ae-070c-4e78-9b78-19ebf7b29ec8 did not equal file:///C:/projects/spark/target/tmp/spark-387316ae-070c-4e78-9b78-19ebf7b29ec8 (DDLSuite.scala:1982) ... - CTAS for external data source table with a non-existing location *** FAILED *** (16 milliseconds) java.lang.IllegalArgumentException: Can not create a Path from an empty string ... - CTAS for external data source table with a existed location *** FAILED *** (15 milliseconds) java.lang.IllegalArgumentException: Can not create a Path from an empty string ... - data source table:partition column name containing a b *** FAILED *** (125 milliseconds)
spark git commit: [SPARK-20741][SPARK SUBMIT] Added cleanup of JARs archive generated by SparkSubmit
Repository: spark Updated Branches: refs/heads/branch-2.2 e01f1f222 -> 022a4957d [SPARK-20741][SPARK SUBMIT] Added cleanup of JARs archive generated by SparkSubmit ## What changes were proposed in this pull request? Deleted generated JARs archive after distribution to HDFS ## How was this patch tested? Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Lior RegevCloses #17986 from liorregev/master. (cherry picked from commit 7306d556903c832984c7f34f1e8fe738a4b2343c) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/022a4957 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/022a4957 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/022a4957 Branch: refs/heads/branch-2.2 Commit: 022a4957d8dc8d6049e0a8c9191fcfd1bd95a4a4 Parents: e01f1f2 Author: Lior Regev Authored: Thu May 25 17:08:19 2017 +0100 Committer: Sean Owen Committed: Thu May 25 17:08:41 2017 +0100 -- .../yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/022a4957/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala -- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index b817570..9956071 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -545,6 +545,7 @@ private[spark] class Client( distribute(jarsArchive.toURI.getPath, resType = LocalResourceType.ARCHIVE, destName = Some(LOCALIZED_LIB_DIR)) + jarsArchive.delete() } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20741][SPARK SUBMIT] Added cleanup of JARs archive generated by SparkSubmit
Repository: spark Updated Branches: refs/heads/master 139da116f -> 7306d5569 [SPARK-20741][SPARK SUBMIT] Added cleanup of JARs archive generated by SparkSubmit ## What changes were proposed in this pull request? Deleted generated JARs archive after distribution to HDFS ## How was this patch tested? Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Lior RegevCloses #17986 from liorregev/master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7306d556 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7306d556 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7306d556 Branch: refs/heads/master Commit: 7306d556903c832984c7f34f1e8fe738a4b2343c Parents: 139da11 Author: Lior Regev Authored: Thu May 25 17:08:19 2017 +0100 Committer: Sean Owen Committed: Thu May 25 17:08:19 2017 +0100 -- .../yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7306d556/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala -- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index b817570..9956071 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -545,6 +545,7 @@ private[spark] class Client( distribute(jarsArchive.toURI.getPath, resType = LocalResourceType.ARCHIVE, destName = Some(LOCALIZED_LIB_DIR)) + jarsArchive.delete() } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20768][PYSPARK][ML] Expose numPartitions (expert) param of PySpark FPGrowth.
Repository: spark Updated Branches: refs/heads/branch-2.2 9cbf39f1c -> e01f1f222 [SPARK-20768][PYSPARK][ML] Expose numPartitions (expert) param of PySpark FPGrowth. ## What changes were proposed in this pull request? Expose numPartitions (expert) param of PySpark FPGrowth. ## How was this patch tested? + [x] Pass all unit tests. Author: Yan Facai (é¢åæ)Closes #18058 from facaiy/ENH/pyspark_fpg_add_num_partition. (cherry picked from commit 139da116f130ed21481d3e9bdee5df4b8d7760ac) Signed-off-by: Yanbo Liang Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e01f1f22 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e01f1f22 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e01f1f22 Branch: refs/heads/branch-2.2 Commit: e01f1f222bcb7c469b1e1595e9338ed478d99894 Parents: 9cbf39f Author: Yan Facai (é¢åæ) Authored: Thu May 25 21:40:39 2017 +0800 Committer: Yanbo Liang Committed: Thu May 25 21:40:52 2017 +0800 -- python/pyspark/ml/fpm.py | 30 +- 1 file changed, 29 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e01f1f22/python/pyspark/ml/fpm.py -- diff --git a/python/pyspark/ml/fpm.py b/python/pyspark/ml/fpm.py index 6ff7d2c..dd7dda5 100644 --- a/python/pyspark/ml/fpm.py +++ b/python/pyspark/ml/fpm.py @@ -49,6 +49,32 @@ class HasMinSupport(Params): return self.getOrDefault(self.minSupport) +class HasNumPartitions(Params): +""" +Mixin for param numPartitions: Number of partitions (at least 1) used by parallel FP-growth. +""" + +numPartitions = Param( +Params._dummy(), +"numPartitions", +"Number of partitions (at least 1) used by parallel FP-growth. " + +"By default the param is not set, " + +"and partition number of the input dataset is used.", +typeConverter=TypeConverters.toInt) + +def setNumPartitions(self, value): +""" +Sets the value of :py:attr:`numPartitions`. +""" +return self._set(numPartitions=value) + +def getNumPartitions(self): +""" +Gets the value of :py:attr:`numPartitions` or its default value. +""" +return self.getOrDefault(self.numPartitions) + + class HasMinConfidence(Params): """ Mixin for param minConfidence. @@ -127,7 +153,9 @@ class FPGrowthModel(JavaModel, JavaMLWritable, JavaMLReadable): class FPGrowth(JavaEstimator, HasItemsCol, HasPredictionCol, - HasMinSupport, HasMinConfidence, JavaMLWritable, JavaMLReadable): + HasMinSupport, HasNumPartitions, HasMinConfidence, + JavaMLWritable, JavaMLReadable): + """ .. note:: Experimental - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20768][PYSPARK][ML] Expose numPartitions (expert) param of PySpark FPGrowth.
Repository: spark Updated Branches: refs/heads/master 913a6bfe4 -> 139da116f [SPARK-20768][PYSPARK][ML] Expose numPartitions (expert) param of PySpark FPGrowth. ## What changes were proposed in this pull request? Expose numPartitions (expert) param of PySpark FPGrowth. ## How was this patch tested? + [x] Pass all unit tests. Author: Yan Facai (é¢åæ)Closes #18058 from facaiy/ENH/pyspark_fpg_add_num_partition. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/139da116 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/139da116 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/139da116 Branch: refs/heads/master Commit: 139da116f130ed21481d3e9bdee5df4b8d7760ac Parents: 913a6bf Author: Yan Facai (é¢åæ) Authored: Thu May 25 21:40:39 2017 +0800 Committer: Yanbo Liang Committed: Thu May 25 21:40:39 2017 +0800 -- python/pyspark/ml/fpm.py | 30 +- 1 file changed, 29 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/139da116/python/pyspark/ml/fpm.py -- diff --git a/python/pyspark/ml/fpm.py b/python/pyspark/ml/fpm.py index 6ff7d2c..dd7dda5 100644 --- a/python/pyspark/ml/fpm.py +++ b/python/pyspark/ml/fpm.py @@ -49,6 +49,32 @@ class HasMinSupport(Params): return self.getOrDefault(self.minSupport) +class HasNumPartitions(Params): +""" +Mixin for param numPartitions: Number of partitions (at least 1) used by parallel FP-growth. +""" + +numPartitions = Param( +Params._dummy(), +"numPartitions", +"Number of partitions (at least 1) used by parallel FP-growth. " + +"By default the param is not set, " + +"and partition number of the input dataset is used.", +typeConverter=TypeConverters.toInt) + +def setNumPartitions(self, value): +""" +Sets the value of :py:attr:`numPartitions`. +""" +return self._set(numPartitions=value) + +def getNumPartitions(self): +""" +Gets the value of :py:attr:`numPartitions` or its default value. +""" +return self.getOrDefault(self.numPartitions) + + class HasMinConfidence(Params): """ Mixin for param minConfidence. @@ -127,7 +153,9 @@ class FPGrowthModel(JavaModel, JavaMLWritable, JavaMLReadable): class FPGrowth(JavaEstimator, HasItemsCol, HasPredictionCol, - HasMinSupport, HasMinConfidence, JavaMLWritable, JavaMLReadable): + HasMinSupport, HasNumPartitions, HasMinConfidence, + JavaMLWritable, JavaMLReadable): + """ .. note:: Experimental - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-19281][FOLLOWUP][ML] Minor fix for PySpark FPGrowth.
Repository: spark Updated Branches: refs/heads/branch-2.2 8896c4ee9 -> 9cbf39f1c [SPARK-19281][FOLLOWUP][ML] Minor fix for PySpark FPGrowth. ## What changes were proposed in this pull request? Follow-up for #17218, some minor fix for PySpark ```FPGrowth```. ## How was this patch tested? Existing UT. Author: Yanbo LiangCloses #18089 from yanboliang/spark-19281. (cherry picked from commit 913a6bfe4b0eb6b80a03b858ab4b2767194103de) Signed-off-by: Yanbo Liang Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9cbf39f1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9cbf39f1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9cbf39f1 Branch: refs/heads/branch-2.2 Commit: 9cbf39f1c74f16483865cd93d6ffc3c521e878a7 Parents: 8896c4e Author: Yanbo Liang Authored: Thu May 25 20:15:15 2017 +0800 Committer: Yanbo Liang Committed: Thu May 25 20:15:38 2017 +0800 -- python/pyspark/ml/fpm.py | 21 +++-- 1 file changed, 11 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9cbf39f1/python/pyspark/ml/fpm.py -- diff --git a/python/pyspark/ml/fpm.py b/python/pyspark/ml/fpm.py index b30d4ed..6ff7d2c 100644 --- a/python/pyspark/ml/fpm.py +++ b/python/pyspark/ml/fpm.py @@ -23,17 +23,17 @@ from pyspark.ml.param.shared import * __all__ = ["FPGrowth", "FPGrowthModel"] -class HasSupport(Params): +class HasMinSupport(Params): """ -Mixin for param support. +Mixin for param minSupport. """ minSupport = Param( Params._dummy(), "minSupport", -"""Minimal support level of the frequent pattern. [0.0, 1.0]. -Any pattern that appears more than (minSupport * size-of-the-dataset) -times will be output""", +"Minimal support level of the frequent pattern. [0.0, 1.0]. " + +"Any pattern that appears more than (minSupport * size-of-the-dataset) " + +"times will be output in the frequent itemsets.", typeConverter=TypeConverters.toFloat) def setMinSupport(self, value): @@ -49,16 +49,17 @@ class HasSupport(Params): return self.getOrDefault(self.minSupport) -class HasConfidence(Params): +class HasMinConfidence(Params): """ -Mixin for param confidence. +Mixin for param minConfidence. """ minConfidence = Param( Params._dummy(), "minConfidence", -"""Minimal confidence for generating Association Rule. [0.0, 1.0] -Note that minConfidence has no effect during fitting.""", +"Minimal confidence for generating Association Rule. [0.0, 1.0]. " + +"minConfidence will not affect the mining for frequent itemsets, " + +"but will affect the association rules generation.", typeConverter=TypeConverters.toFloat) def setMinConfidence(self, value): @@ -126,7 +127,7 @@ class FPGrowthModel(JavaModel, JavaMLWritable, JavaMLReadable): class FPGrowth(JavaEstimator, HasItemsCol, HasPredictionCol, - HasSupport, HasConfidence, JavaMLWritable, JavaMLReadable): + HasMinSupport, HasMinConfidence, JavaMLWritable, JavaMLReadable): """ .. note:: Experimental - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-19281][FOLLOWUP][ML] Minor fix for PySpark FPGrowth.
Repository: spark Updated Branches: refs/heads/master 3f94e64aa -> 913a6bfe4 [SPARK-19281][FOLLOWUP][ML] Minor fix for PySpark FPGrowth. ## What changes were proposed in this pull request? Follow-up for #17218, some minor fix for PySpark ```FPGrowth```. ## How was this patch tested? Existing UT. Author: Yanbo LiangCloses #18089 from yanboliang/spark-19281. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/913a6bfe Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/913a6bfe Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/913a6bfe Branch: refs/heads/master Commit: 913a6bfe4b0eb6b80a03b858ab4b2767194103de Parents: 3f94e64 Author: Yanbo Liang Authored: Thu May 25 20:15:15 2017 +0800 Committer: Yanbo Liang Committed: Thu May 25 20:15:15 2017 +0800 -- python/pyspark/ml/fpm.py | 21 +++-- 1 file changed, 11 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/913a6bfe/python/pyspark/ml/fpm.py -- diff --git a/python/pyspark/ml/fpm.py b/python/pyspark/ml/fpm.py index b30d4ed..6ff7d2c 100644 --- a/python/pyspark/ml/fpm.py +++ b/python/pyspark/ml/fpm.py @@ -23,17 +23,17 @@ from pyspark.ml.param.shared import * __all__ = ["FPGrowth", "FPGrowthModel"] -class HasSupport(Params): +class HasMinSupport(Params): """ -Mixin for param support. +Mixin for param minSupport. """ minSupport = Param( Params._dummy(), "minSupport", -"""Minimal support level of the frequent pattern. [0.0, 1.0]. -Any pattern that appears more than (minSupport * size-of-the-dataset) -times will be output""", +"Minimal support level of the frequent pattern. [0.0, 1.0]. " + +"Any pattern that appears more than (minSupport * size-of-the-dataset) " + +"times will be output in the frequent itemsets.", typeConverter=TypeConverters.toFloat) def setMinSupport(self, value): @@ -49,16 +49,17 @@ class HasSupport(Params): return self.getOrDefault(self.minSupport) -class HasConfidence(Params): +class HasMinConfidence(Params): """ -Mixin for param confidence. +Mixin for param minConfidence. """ minConfidence = Param( Params._dummy(), "minConfidence", -"""Minimal confidence for generating Association Rule. [0.0, 1.0] -Note that minConfidence has no effect during fitting.""", +"Minimal confidence for generating Association Rule. [0.0, 1.0]. " + +"minConfidence will not affect the mining for frequent itemsets, " + +"but will affect the association rules generation.", typeConverter=TypeConverters.toFloat) def setMinConfidence(self, value): @@ -126,7 +127,7 @@ class FPGrowthModel(JavaModel, JavaMLWritable, JavaMLReadable): class FPGrowth(JavaEstimator, HasItemsCol, HasPredictionCol, - HasSupport, HasConfidence, JavaMLWritable, JavaMLReadable): + HasMinSupport, HasMinConfidence, JavaMLWritable, JavaMLReadable): """ .. note:: Experimental - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-19659] Fetch big blocks to disk when shuffle-read.
Repository: spark Updated Branches: refs/heads/branch-2.2 b52a06d70 -> 8896c4ee9 [SPARK-19659] Fetch big blocks to disk when shuffle-read. ## What changes were proposed in this pull request? Currently the whole block is fetched into memory(off heap by default) when shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can be large when skew situations. If OOM happens during shuffle read, job will be killed and users will be notified to "Consider boosting spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more memory can resolve the OOM. However the approach is not perfectly suitable for production environment, especially for data warehouse. Using Spark SQL as data engine in warehouse, users hope to have a unified parameter(e.g. memory) but less resource wasted(resource is allocated but not used). The hope is strong especially when migrating data engine to Spark from another one(e.g. Hive). Tuning the parameter for thousands of SQLs one by one is very time consuming. It's not always easy to predict skew situations, when happen, it make sense to fetch remote blocks to disk for shuffle-read, rather than kill the job because of OOM. In this pr, I propose to fetch big blocks to disk(which is also mentioned in SPARK-3019): 1. Track average size and also the outliers(which are larger than 2*avgSize) in MapStatus; 2. Request memory from `MemoryManager` before fetch blocks and release the memory to `MemoryManager` when `ManagedBuffer` is released. 3. Fetch remote blocks to disk when failing acquiring memory from `MemoryManager`, otherwise fetch to memory. This is an improvement for memory control when shuffle blocks and help to avoid OOM in scenarios like below: 1. Single huge block; 2. Sizes of many blocks are underestimated in `MapStatus` and the actual footprint of blocks is much larger than the estimated. ## How was this patch tested? Added unit test in `MapStatusSuite` and `ShuffleBlockFetcherIteratorSuite`. Author: jinxingCloses #16989 from jinxing64/SPARK-19659. (cherry picked from commit 3f94e64aa8fd806ae1fa0156d846ce96afacddd3) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8896c4ee Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8896c4ee Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8896c4ee Branch: refs/heads/branch-2.2 Commit: 8896c4ee9ea315a7dcd1a05b7201e7ad0539a5ed Parents: b52a06d Author: jinxing Authored: Thu May 25 16:11:30 2017 +0800 Committer: Wenchen Fan Committed: Thu May 25 16:11:51 2017 +0800 -- .../network/server/OneForOneStreamManager.java | 21 + .../network/shuffle/ExternalShuffleClient.java | 7 +- .../network/shuffle/OneForOneBlockFetcher.java | 62 +- .../spark/network/shuffle/ShuffleClient.java| 4 +- .../network/sasl/SaslIntegrationSuite.java | 2 +- .../ExternalShuffleIntegrationSuite.java| 2 +- .../shuffle/OneForOneBlockFetcherSuite.java | 7 +- .../apache/spark/internal/config/package.scala | 6 ++ .../spark/network/BlockTransferService.scala| 7 +- .../netty/NettyBlockTransferService.scala | 7 +- .../spark/shuffle/BlockStoreShuffleReader.scala | 3 +- .../storage/ShuffleBlockFetcherIterator.scala | 71 ++-- .../apache/spark/MapOutputTrackerSuite.scala| 2 +- .../netty/NettyBlockTransferSecuritySuite.scala | 2 +- .../spark/storage/BlockManagerSuite.scala | 4 +- .../ShuffleBlockFetcherIteratorSuite.scala | 86 ++-- docs/configuration.md | 8 ++ 17 files changed, 254 insertions(+), 47 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8896c4ee/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java -- diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java index ee367f9..ad8e8b4 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java @@ -23,6 +23,8 @@ import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import scala.Tuple2; + import com.google.common.base.Preconditions; import io.netty.channel.Channel; import org.slf4j.Logger; @@ -95,6 +97,25 @@ public class OneForOneStreamManager extends StreamManager { } @Override +
spark git commit: [SPARK-19659] Fetch big blocks to disk when shuffle-read.
Repository: spark Updated Branches: refs/heads/master 731462a04 -> 3f94e64aa [SPARK-19659] Fetch big blocks to disk when shuffle-read. ## What changes were proposed in this pull request? Currently the whole block is fetched into memory(off heap by default) when shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can be large when skew situations. If OOM happens during shuffle read, job will be killed and users will be notified to "Consider boosting spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more memory can resolve the OOM. However the approach is not perfectly suitable for production environment, especially for data warehouse. Using Spark SQL as data engine in warehouse, users hope to have a unified parameter(e.g. memory) but less resource wasted(resource is allocated but not used). The hope is strong especially when migrating data engine to Spark from another one(e.g. Hive). Tuning the parameter for thousands of SQLs one by one is very time consuming. It's not always easy to predict skew situations, when happen, it make sense to fetch remote blocks to disk for shuffle-read, rather than kill the job because of OOM. In this pr, I propose to fetch big blocks to disk(which is also mentioned in SPARK-3019): 1. Track average size and also the outliers(which are larger than 2*avgSize) in MapStatus; 2. Request memory from `MemoryManager` before fetch blocks and release the memory to `MemoryManager` when `ManagedBuffer` is released. 3. Fetch remote blocks to disk when failing acquiring memory from `MemoryManager`, otherwise fetch to memory. This is an improvement for memory control when shuffle blocks and help to avoid OOM in scenarios like below: 1. Single huge block; 2. Sizes of many blocks are underestimated in `MapStatus` and the actual footprint of blocks is much larger than the estimated. ## How was this patch tested? Added unit test in `MapStatusSuite` and `ShuffleBlockFetcherIteratorSuite`. Author: jinxingCloses #16989 from jinxing64/SPARK-19659. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3f94e64a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3f94e64a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3f94e64a Branch: refs/heads/master Commit: 3f94e64aa8fd806ae1fa0156d846ce96afacddd3 Parents: 731462a Author: jinxing Authored: Thu May 25 16:11:30 2017 +0800 Committer: Wenchen Fan Committed: Thu May 25 16:11:30 2017 +0800 -- .../network/server/OneForOneStreamManager.java | 21 + .../network/shuffle/ExternalShuffleClient.java | 7 +- .../network/shuffle/OneForOneBlockFetcher.java | 62 +- .../spark/network/shuffle/ShuffleClient.java| 4 +- .../network/sasl/SaslIntegrationSuite.java | 2 +- .../ExternalShuffleIntegrationSuite.java| 2 +- .../shuffle/OneForOneBlockFetcherSuite.java | 7 +- .../apache/spark/internal/config/package.scala | 6 ++ .../spark/network/BlockTransferService.scala| 7 +- .../netty/NettyBlockTransferService.scala | 7 +- .../spark/shuffle/BlockStoreShuffleReader.scala | 3 +- .../storage/ShuffleBlockFetcherIterator.scala | 71 ++-- .../apache/spark/MapOutputTrackerSuite.scala| 2 +- .../netty/NettyBlockTransferSecuritySuite.scala | 2 +- .../spark/storage/BlockManagerSuite.scala | 4 +- .../ShuffleBlockFetcherIteratorSuite.scala | 86 ++-- docs/configuration.md | 8 ++ 17 files changed, 254 insertions(+), 47 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3f94e64a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java -- diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java index ee367f9..ad8e8b4 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java @@ -23,6 +23,8 @@ import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import scala.Tuple2; + import com.google.common.base.Preconditions; import io.netty.channel.Channel; import org.slf4j.Logger; @@ -95,6 +97,25 @@ public class OneForOneStreamManager extends StreamManager { } @Override + public ManagedBuffer openStream(String streamChunkId) { +Tuple2 streamIdAndChunkId =
spark git commit: [SPARK-20250][CORE] Improper OOM error when a task been killed while spilling data
Repository: spark Updated Branches: refs/heads/branch-2.0 79fbfbbc7 -> ef0ebdde0 [SPARK-20250][CORE] Improper OOM error when a task been killed while spilling data Currently, when a task is calling spill() but it receives a killing request from driver (e.g., speculative task), the `TaskMemoryManager` will throw an `OOM` exception. And we don't catch `Fatal` exception when a error caused by `Thread.interrupt`. So for `ClosedByInterruptException`, we should throw `RuntimeException` instead of `OutOfMemoryError`. https://issues.apache.org/jira/browse/SPARK-20250?jql=project%20%3D%20SPARK Existing unit tests. Author: Xianyang LiuCloses #18090 from ConeyLiu/SPARK-20250. (cherry picked from commit 731462a04f8e33ac507ad19b4270c783a012a33e) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ef0ebdde Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ef0ebdde Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ef0ebdde Branch: refs/heads/branch-2.0 Commit: ef0ebdde02cb130500af0ad79376563b15f921dc Parents: 79fbfbb Author: Xianyang Liu Authored: Thu May 25 15:47:59 2017 +0800 Committer: Wenchen Fan Committed: Thu May 25 15:52:31 2017 +0800 -- .../java/org/apache/spark/memory/TaskMemoryManager.java | 9 + 1 file changed, 9 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ef0ebdde/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java -- diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java index 867c4a1..23f6fd3 100644 --- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java +++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java @@ -19,6 +19,7 @@ package org.apache.spark.memory; import javax.annotation.concurrent.GuardedBy; import java.io.IOException; +import java.nio.channels.ClosedByInterruptException; import java.util.Arrays; import java.util.BitSet; import java.util.HashSet; @@ -156,6 +157,10 @@ public class TaskMemoryManager { break; } } +} catch (ClosedByInterruptException e) { + // This called by user to kill a task (e.g: speculative task). + logger.error("error while calling spill() on " + c, e); + throw new RuntimeException(e.getMessage()); } catch (IOException e) { logger.error("error while calling spill() on " + c, e); throw new OutOfMemoryError("error while calling spill() on " + c + " : " @@ -174,6 +179,10 @@ public class TaskMemoryManager { Utils.bytesToString(released), consumer); got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); } +} catch (ClosedByInterruptException e) { + // This called by user to kill a task (e.g: speculative task). + logger.error("error while calling spill() on " + consumer, e); + throw new RuntimeException(e.getMessage()); } catch (IOException e) { logger.error("error while calling spill() on " + consumer, e); throw new OutOfMemoryError("error while calling spill() on " + consumer + " : " - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20250][CORE] Improper OOM error when a task been killed while spilling data
Repository: spark Updated Branches: refs/heads/branch-2.1 7015f6f0e -> 7fc2347b5 [SPARK-20250][CORE] Improper OOM error when a task been killed while spilling data Currently, when a task is calling spill() but it receives a killing request from driver (e.g., speculative task), the `TaskMemoryManager` will throw an `OOM` exception. And we don't catch `Fatal` exception when a error caused by `Thread.interrupt`. So for `ClosedByInterruptException`, we should throw `RuntimeException` instead of `OutOfMemoryError`. https://issues.apache.org/jira/browse/SPARK-20250?jql=project%20%3D%20SPARK Existing unit tests. Author: Xianyang LiuCloses #18090 from ConeyLiu/SPARK-20250. (cherry picked from commit 731462a04f8e33ac507ad19b4270c783a012a33e) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7fc2347b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7fc2347b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7fc2347b Branch: refs/heads/branch-2.1 Commit: 7fc2347b510d73fb55ab69c0579494b0761fb022 Parents: 7015f6f Author: Xianyang Liu Authored: Thu May 25 15:47:59 2017 +0800 Committer: Wenchen Fan Committed: Thu May 25 15:51:27 2017 +0800 -- .../java/org/apache/spark/memory/TaskMemoryManager.java | 9 + 1 file changed, 9 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7fc2347b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java -- diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java index c40974b..3385d0e 100644 --- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java +++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java @@ -19,6 +19,7 @@ package org.apache.spark.memory; import javax.annotation.concurrent.GuardedBy; import java.io.IOException; +import java.nio.channels.ClosedByInterruptException; import java.util.Arrays; import java.util.BitSet; import java.util.HashSet; @@ -156,6 +157,10 @@ public class TaskMemoryManager { break; } } +} catch (ClosedByInterruptException e) { + // This called by user to kill a task (e.g: speculative task). + logger.error("error while calling spill() on " + c, e); + throw new RuntimeException(e.getMessage()); } catch (IOException e) { logger.error("error while calling spill() on " + c, e); throw new OutOfMemoryError("error while calling spill() on " + c + " : " @@ -174,6 +179,10 @@ public class TaskMemoryManager { Utils.bytesToString(released), consumer); got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); } +} catch (ClosedByInterruptException e) { + // This called by user to kill a task (e.g: speculative task). + logger.error("error while calling spill() on " + consumer, e); + throw new RuntimeException(e.getMessage()); } catch (IOException e) { logger.error("error while calling spill() on " + consumer, e); throw new OutOfMemoryError("error while calling spill() on " + consumer + " : " - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20250][CORE] Improper OOM error when a task been killed while spilling data
Repository: spark Updated Branches: refs/heads/branch-2.2 e0aa23939 -> b52a06d70 [SPARK-20250][CORE] Improper OOM error when a task been killed while spilling data ## What changes were proposed in this pull request? Currently, when a task is calling spill() but it receives a killing request from driver (e.g., speculative task), the `TaskMemoryManager` will throw an `OOM` exception. And we don't catch `Fatal` exception when a error caused by `Thread.interrupt`. So for `ClosedByInterruptException`, we should throw `RuntimeException` instead of `OutOfMemoryError`. https://issues.apache.org/jira/browse/SPARK-20250?jql=project%20%3D%20SPARK ## How was this patch tested? Existing unit tests. Author: Xianyang LiuCloses #18090 from ConeyLiu/SPARK-20250. (cherry picked from commit 731462a04f8e33ac507ad19b4270c783a012a33e) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b52a06d7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b52a06d7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b52a06d7 Branch: refs/heads/branch-2.2 Commit: b52a06d7034b3d392f7f0ee69a2fba098783e70d Parents: e0aa239 Author: Xianyang Liu Authored: Thu May 25 15:47:59 2017 +0800 Committer: Wenchen Fan Committed: Thu May 25 15:48:16 2017 +0800 -- .../java/org/apache/spark/memory/TaskMemoryManager.java | 9 + 1 file changed, 9 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b52a06d7/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java -- diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java index 5f91411..761ba9d 100644 --- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java +++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java @@ -19,6 +19,7 @@ package org.apache.spark.memory; import javax.annotation.concurrent.GuardedBy; import java.io.IOException; +import java.nio.channels.ClosedByInterruptException; import java.util.Arrays; import java.util.ArrayList; import java.util.BitSet; @@ -184,6 +185,10 @@ public class TaskMemoryManager { break; } } + } catch (ClosedByInterruptException e) { +// This called by user to kill a task (e.g: speculative task). +logger.error("error while calling spill() on " + c, e); +throw new RuntimeException(e.getMessage()); } catch (IOException e) { logger.error("error while calling spill() on " + c, e); throw new OutOfMemoryError("error while calling spill() on " + c + " : " @@ -201,6 +206,10 @@ public class TaskMemoryManager { Utils.bytesToString(released), consumer); got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); } +} catch (ClosedByInterruptException e) { + // This called by user to kill a task (e.g: speculative task). + logger.error("error while calling spill() on " + consumer, e); + throw new RuntimeException(e.getMessage()); } catch (IOException e) { logger.error("error while calling spill() on " + consumer, e); throw new OutOfMemoryError("error while calling spill() on " + consumer + " : " - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org