spark git commit: [SPARK-13652][CORE] Copy ByteBuffer in sendRpcSync as it will be recycled
Repository: spark Updated Branches: refs/heads/branch-1.6 b3a512965 -> 51c676e46 [SPARK-13652][CORE] Copy ByteBuffer in sendRpcSync as it will be recycled ## What changes were proposed in this pull request? `sendRpcSync` should copy the response content because the underlying buffer will be recycled and reused. ## How was this patch tested? Jenkins unit tests. Author: Shixiong ZhuCloses #11499 from zsxwing/SPARK-13652. (cherry picked from commit 465c665db1dc65e3b02c584cf7f8d06b24909b0c) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/51c676e4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/51c676e4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/51c676e4 Branch: refs/heads/branch-1.6 Commit: 51c676e46c9be28aa9ac37fda45482b38f2eb1d5 Parents: b3a5129 Author: Shixiong Zhu Authored: Thu Mar 3 22:53:07 2016 -0800 Committer: Shixiong Zhu Committed: Thu Mar 3 22:53:21 2016 -0800 -- .../org/apache/spark/network/client/RpcResponseCallback.java | 7 ++- .../java/org/apache/spark/network/client/TransportClient.java | 6 +- 2 files changed, 11 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/51c676e4/network/common/src/main/java/org/apache/spark/network/client/RpcResponseCallback.java -- diff --git a/network/common/src/main/java/org/apache/spark/network/client/RpcResponseCallback.java b/network/common/src/main/java/org/apache/spark/network/client/RpcResponseCallback.java index 47e93f9..6afc63f 100644 --- a/network/common/src/main/java/org/apache/spark/network/client/RpcResponseCallback.java +++ b/network/common/src/main/java/org/apache/spark/network/client/RpcResponseCallback.java @@ -24,7 +24,12 @@ import java.nio.ByteBuffer; * failure. */ public interface RpcResponseCallback { - /** Successful serialized result from server. */ + /** + * Successful serialized result from server. + * + * After `onSuccess` returns, `response` will be recycled and its content will become invalid. + * Please copy the content of `response` if you want to use it after `onSuccess` returns. + */ void onSuccess(ByteBuffer response); /** Exception either propagated from server or raised on client side. */ http://git-wip-us.apache.org/repos/asf/spark/blob/51c676e4/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java -- diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java index c49ca4d..13debf5 100644 --- a/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java +++ b/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java @@ -257,7 +257,11 @@ public class TransportClient implements Closeable { sendRpc(message, new RpcResponseCallback() { @Override public void onSuccess(ByteBuffer response) { -result.set(response); +ByteBuffer copy = ByteBuffer.allocate(response.remaining()); +copy.put(response); +// flip "copy" to make it readable +copy.flip(); +result.set(copy); } @Override - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-13652][CORE] Copy ByteBuffer in sendRpcSync as it will be recycled
Repository: spark Updated Branches: refs/heads/master f6ac7c30d -> 465c665db [SPARK-13652][CORE] Copy ByteBuffer in sendRpcSync as it will be recycled ## What changes were proposed in this pull request? `sendRpcSync` should copy the response content because the underlying buffer will be recycled and reused. ## How was this patch tested? Jenkins unit tests. Author: Shixiong ZhuCloses #11499 from zsxwing/SPARK-13652. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/465c665d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/465c665d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/465c665d Branch: refs/heads/master Commit: 465c665db1dc65e3b02c584cf7f8d06b24909b0c Parents: f6ac7c3 Author: Shixiong Zhu Authored: Thu Mar 3 22:53:07 2016 -0800 Committer: Shixiong Zhu Committed: Thu Mar 3 22:53:07 2016 -0800 -- .../org/apache/spark/network/client/RpcResponseCallback.java | 7 ++- .../java/org/apache/spark/network/client/TransportClient.java | 6 +- 2 files changed, 11 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/465c665d/common/network-common/src/main/java/org/apache/spark/network/client/RpcResponseCallback.java -- diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/RpcResponseCallback.java b/common/network-common/src/main/java/org/apache/spark/network/client/RpcResponseCallback.java index 47e93f9..6afc63f 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/RpcResponseCallback.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/RpcResponseCallback.java @@ -24,7 +24,12 @@ import java.nio.ByteBuffer; * failure. */ public interface RpcResponseCallback { - /** Successful serialized result from server. */ + /** + * Successful serialized result from server. + * + * After `onSuccess` returns, `response` will be recycled and its content will become invalid. + * Please copy the content of `response` if you want to use it after `onSuccess` returns. + */ void onSuccess(ByteBuffer response); /** Exception either propagated from server or raised on client side. */ http://git-wip-us.apache.org/repos/asf/spark/blob/465c665d/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java -- diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java index e15f096..64a8317 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java @@ -257,7 +257,11 @@ public class TransportClient implements Closeable { sendRpc(message, new RpcResponseCallback() { @Override public void onSuccess(ByteBuffer response) { -result.set(response); +ByteBuffer copy = ByteBuffer.allocate(response.remaining()); +copy.put(response); +// flip "copy" to make it readable +copy.flip(); +result.set(copy); } @Override - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12941][SQL][MASTER] Spark-SQL JDBC Oracle dialect fails to map string datatypes to Oracle VARCHAR datatype mapping
Repository: spark Updated Branches: refs/heads/master 15d57f9c2 -> f6ac7c30d [SPARK-12941][SQL][MASTER] Spark-SQL JDBC Oracle dialect fails to map string datatypes to Oracle VARCHAR datatype mapping ## What changes were proposed in this pull request? A test suite added for the bug fix -SPARK 12941; for the mapping of the StringType to corresponding in Oracle ## How was this patch tested? manual tests done (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Author: thomastechsAuthor: THOMAS SEBASTIAN Closes #11489 from thomastechs/thomastechs-12941-master-new. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6ac7c30 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6ac7c30 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6ac7c30 Branch: refs/heads/master Commit: f6ac7c30d48e18466e825578fa457e2a0ed4 Parents: 15d57f9 Author: thomastechs Authored: Thu Mar 3 20:35:40 2016 -0800 Committer: Yin Huai Committed: Thu Mar 3 20:35:40 2016 -0800 -- .../src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 6 ++ 1 file changed, 6 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f6ac7c30/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index b0d64aa..dfffa58 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -645,4 +645,10 @@ class JDBCSuite extends SparkFunSuite r => assert(!List("testPass", "testUser").exists(r.toString.contains)) } } + + test("SPARK 12941: The data type mapping for StringType to Oracle") { +val oracleDialect = JdbcDialects.get("jdbc:oracle://127.0.0.1/db") +assert(oracleDialect.getJDBCType(StringType). + map(_.databaseTypeDefinition).get == "VARCHAR2(255)") + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-13647] [SQL] also check if numeric value is within allowed range in _verify_type
Repository: spark Updated Branches: refs/heads/master d062587dd -> 15d57f9c2 [SPARK-13647] [SQL] also check if numeric value is within allowed range in _verify_type ## What changes were proposed in this pull request? This PR makes the `_verify_type` in `types.py` more strict, also check if numeric value is within allowed range. ## How was this patch tested? newly added doc test. Author: Wenchen FanCloses #11492 from cloud-fan/py-verify. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/15d57f9c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/15d57f9c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/15d57f9c Branch: refs/heads/master Commit: 15d57f9c23145ace37d1631d8f9c19675c142214 Parents: d062587 Author: Wenchen Fan Authored: Thu Mar 3 20:16:37 2016 -0800 Committer: Davies Liu Committed: Thu Mar 3 20:16:37 2016 -0800 -- python/pyspark/sql/types.py | 27 --- 1 file changed, 24 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/15d57f9c/python/pyspark/sql/types.py -- diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 5bc0773..d1f5b47 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -1093,8 +1093,11 @@ _acceptable_types = { def _verify_type(obj, dataType): """ -Verify the type of obj against dataType, raise an exception if -they do not match. +Verify the type of obj against dataType, raise a TypeError if they do not match. + +Also verify the value of obj against datatype, raise a ValueError if it's not within the allowed +range, e.g. using 128 as ByteType will overflow. Note that, Python float is not checked, so it +will become infinity when cast to Java float if it overflows. >>> _verify_type(None, StructType([])) >>> _verify_type("", StringType()) @@ -,6 +1114,12 @@ def _verify_type(obj, dataType): Traceback (most recent call last): ... ValueError:... +>>> # Check if numeric values are within the allowed range. +>>> _verify_type(12, ByteType()) +>>> _verify_type(1234, ByteType()) # doctest: +IGNORE_EXCEPTION_DETAIL +Traceback (most recent call last): +... +ValueError:... """ # all objects are nullable if obj is None: @@ -1137,7 +1146,19 @@ def _verify_type(obj, dataType): if type(obj) not in _acceptable_types[_type]: raise TypeError("%s can not accept object %r in type %s" % (dataType, obj, type(obj))) -if isinstance(dataType, ArrayType): +if isinstance(dataType, ByteType): +if obj < -128 or obj > 127: +raise ValueError("object of ByteType out of range, got: %s" % obj) + +elif isinstance(dataType, ShortType): +if obj < -32768 or obj > 32767: +raise ValueError("object of ShortType out of range, got: %s" % obj) + +elif isinstance(dataType, IntegerType): +if obj < -2147483648 or obj > 2147483647: +raise ValueError("object of IntegerType out of range, got: %s" % obj) + +elif isinstance(dataType, ArrayType): for i in obj: _verify_type(i, dataType.elementType) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-13601] [TESTS] use 1 partition in tests to avoid race conditions
Repository: spark Updated Branches: refs/heads/branch-1.6 fa86dc47a -> b3a512965 [SPARK-13601] [TESTS] use 1 partition in tests to avoid race conditions Fix race conditions when cleanup files. Existing tests. Author: Davies LiuCloses #11507 from davies/flaky. (cherry picked from commit d062587dd2c4ed13998ee8bcc9d08f29734df228) Signed-off-by: Davies Liu Conflicts: sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestRelationSuite.scala Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b3a51296 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b3a51296 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b3a51296 Branch: refs/heads/branch-1.6 Commit: b3a512965d9c27a7078448a6e8eae9ecfcaf1553 Parents: fa86dc4 Author: Davies Liu Authored: Thu Mar 3 17:46:28 2016 -0800 Committer: Davies Liu Committed: Thu Mar 3 17:47:56 2016 -0800 -- .../spark/sql/sources/CommitFailureTestRelationSuite.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b3a51296/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestRelationSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestRelationSuite.scala index 793be8d..8438349 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestRelationSuite.scala @@ -51,7 +51,7 @@ class CommitFailureTestRelationSuite extends SQLTestUtils with TestHiveSingleton withTempPath { file => // fail the job in the middle of writing val divideByZero = udf((x: Int) => { x / (x - 1)}) - val df = sqlContext.range(0, 10).select(divideByZero(col("id"))) + val df = sqlContext.range(0, 10).coalesce(1).select(divideByZero(col("id"))) SimpleTextRelation.callbackCalled = false intercept[SparkException] { @@ -67,7 +67,8 @@ class CommitFailureTestRelationSuite extends SQLTestUtils with TestHiveSingleton test("call failure callbacks before close writer - partitioned") { SimpleTextRelation.failCommitter = false withTempPath { file => - val df = sqlContext.range(0, 10).select(col("id").mod(2).as("key"), col("id")) + // fail the job in the middle of writing + val df = sqlContext.range(0, 10).coalesce(1).select(col("id").mod(2).as("key"), col("id")) SimpleTextRelation.callbackCalled = false SimpleTextRelation.failWriter = true - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-13601] [TESTS] use 1 partition in tests to avoid race conditions
Repository: spark Updated Branches: refs/heads/master b373a8886 -> d062587dd [SPARK-13601] [TESTS] use 1 partition in tests to avoid race conditions ## What changes were proposed in this pull request? Fix race conditions when cleanup files. ## How was this patch tested? Existing tests. Author: Davies LiuCloses #11507 from davies/flaky. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d062587d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d062587d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d062587d Branch: refs/heads/master Commit: d062587dd2c4ed13998ee8bcc9d08f29734df228 Parents: b373a88 Author: Davies Liu Authored: Thu Mar 3 17:46:28 2016 -0800 Committer: Davies Liu Committed: Thu Mar 3 17:46:28 2016 -0800 -- .../spark/sql/sources/CommitFailureTestRelationSuite.scala| 7 --- 1 file changed, 4 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d062587d/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestRelationSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestRelationSuite.scala index 64c27da..2058705 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestRelationSuite.scala @@ -51,7 +51,7 @@ class CommitFailureTestRelationSuite extends SQLTestUtils with TestHiveSingleton withTempPath { file => // fail the job in the middle of writing val divideByZero = udf((x: Int) => { x / (x - 1)}) - val df = sqlContext.range(0, 10).select(divideByZero(col("id"))) + val df = sqlContext.range(0, 10).coalesce(1).select(divideByZero(col("id"))) SimpleTextRelation.callbackCalled = false intercept[SparkException] { @@ -69,7 +69,8 @@ class CommitFailureTestRelationSuite extends SQLTestUtils with TestHiveSingleton withTempPath { file => // fail the job in the middle of writing val divideByZero = udf((x: Int) => { x / (x - 1)}) - val df = sqlContext.range(0, 10).select(col("id").mod(2).as("key"), divideByZero(col("id"))) + val df = sqlContext.range(0, 10).coalesce(1) +.select(col("id").mod(2).as("key"), divideByZero(col("id"))) SimpleTextRelation.callbackCalled = false intercept[SparkException] { @@ -87,7 +88,7 @@ class CommitFailureTestRelationSuite extends SQLTestUtils with TestHiveSingleton SimpleTextRelation.failCommitter = false withTempPath { file => // fail the job in the middle of writing - val df = sqlContext.range(0, 10).select(col("id").mod(2).as("key"), col("id")) + val df = sqlContext.range(0, 10).coalesce(1).select(col("id").mod(2).as("key"), col("id")) SimpleTextRelation.callbackCalled = false SimpleTextRelation.failWriter = true - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-13415][SQL] Visualize subquery in SQL web UI
Repository: spark Updated Branches: refs/heads/master ad0de99f3 -> b373a8886 [SPARK-13415][SQL] Visualize subquery in SQL web UI ## What changes were proposed in this pull request? This PR support visualization for subquery in SQL web UI, also improve the explain of subquery, especially when it's used together with whole stage codegen. For example: ```python >>> sqlContext.range(100).registerTempTable("range") >>> sqlContext.sql("select id / (select sum(id) from range) from range where id >>> > (select id from range limit 1)").explain(True) == Parsed Logical Plan == 'Project [unresolvedalias(('id / subquery#9), None)] : +- 'SubqueryAlias subquery#9 : +- 'Project [unresolvedalias('sum('id), None)] :+- 'UnresolvedRelation `range`, None +- 'Filter ('id > subquery#8) : +- 'SubqueryAlias subquery#8 : +- 'GlobalLimit 1 :+- 'LocalLimit 1 : +- 'Project [unresolvedalias('id, None)] : +- 'UnresolvedRelation `range`, None +- 'UnresolvedRelation `range`, None == Analyzed Logical Plan == (id / scalarsubquery()): double Project [(cast(id#0L as double) / cast(subquery#9 as double)) AS (id / scalarsubquery())#11] : +- SubqueryAlias subquery#9 : +- Aggregate [(sum(id#0L),mode=Complete,isDistinct=false) AS sum(id)#10L] :+- SubqueryAlias range : +- Range 0, 100, 1, 4, [id#0L] +- Filter (id#0L > subquery#8) : +- SubqueryAlias subquery#8 : +- GlobalLimit 1 :+- LocalLimit 1 : +- Project [id#0L] : +- SubqueryAlias range : +- Range 0, 100, 1, 4, [id#0L] +- SubqueryAlias range +- Range 0, 100, 1, 4, [id#0L] == Optimized Logical Plan == Project [(cast(id#0L as double) / cast(subquery#9 as double)) AS (id / scalarsubquery())#11] : +- SubqueryAlias subquery#9 : +- Aggregate [(sum(id#0L),mode=Complete,isDistinct=false) AS sum(id)#10L] :+- Range 0, 100, 1, 4, [id#0L] +- Filter (id#0L > subquery#8) : +- SubqueryAlias subquery#8 : +- GlobalLimit 1 :+- LocalLimit 1 : +- Project [id#0L] : +- Range 0, 100, 1, 4, [id#0L] +- Range 0, 100, 1, 4, [id#0L] == Physical Plan == WholeStageCodegen : +- Project [(cast(id#0L as double) / cast(subquery#9 as double)) AS (id / scalarsubquery())#11] : : +- Subquery subquery#9 : : +- WholeStageCodegen : :: +- TungstenAggregate(key=[], functions=[(sum(id#0L),mode=Final,isDistinct=false)], output=[sum(id)#10L]) : :: +- INPUT : :+- Exchange SinglePartition, None : : +- WholeStageCodegen : : : +- TungstenAggregate(key=[], functions=[(sum(id#0L),mode=Partial,isDistinct=false)], output=[sum#14L]) : : : +- Range 0, 1, 4, 100, [id#0L] : +- Filter (id#0L > subquery#8) :: +- Subquery subquery#8 :: +- CollectLimit 1 ::+- WholeStageCodegen :: : +- Project [id#0L] :: : +- Range 0, 1, 4, 100, [id#0L] :+- Range 0, 1, 4, 100, [id#0L] ``` The web UI looks like: ![subquery](https://cloud.githubusercontent.com/assets/40902/13377963/932bcbae-dda7-11e5-82f7-03c9be85d77c.png) This PR also change the tree structure of WholeStageCodegen to make it consistent than others. Before this change, Both WholeStageCodegen and InputAdapter hold a references to the same plans, those could be updated without notify another, causing problems, this is discovered by #11403 . ## How was this patch tested? Existing tests, also manual tests with the example query, check the explain and web UI. Author: Davies LiuCloses #11417 from davies/viz_subquery. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b373a888 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b373a888 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b373a888 Branch: refs/heads/master Commit: b373a888621ba6f0dd499f47093d4e2e42086dfc Parents: ad0de99 Author: Davies Liu Authored: Thu Mar 3 17:36:48 2016 -0800 Committer: Yin Huai Committed: Thu Mar 3 17:36:48 2016 -0800 -- .../spark/sql/catalyst/plans/QueryPlan.scala| 10 +- .../spark/sql/catalyst/trees/TreeNode.scala | 49 .../spark/sql/execution/SparkPlanInfo.scala | 7 +- .../spark/sql/execution/WholeStageCodegen.scala | 113 --- .../spark/sql/execution/debug/package.scala | 23 +++- .../spark/sql/execution/ui/SparkPlanGraph.scala | 66 ++- .../sql/execution/WholeStageCodegenSuite.scala | 17 +-- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 6 +- .../spark/sql/util/DataFrameCallbackSuite.scala | 2 +- 9 files changed, 166 insertions(+), 127 deletions(-)
spark git commit: [SPARK-13584][SQL][TESTS] Make ContinuousQueryManagerSuite not output logs to the console
Repository: spark Updated Branches: refs/heads/master 3edcc4022 -> ad0de99f3 [SPARK-13584][SQL][TESTS] Make ContinuousQueryManagerSuite not output logs to the console ## What changes were proposed in this pull request? Make ContinuousQueryManagerSuite not output logs to the console. The logs will still output to `unit-tests.log`. I also updated `SQLListenerMemoryLeakSuite` to use `quietly` to avoid changing the log level which won't output logs to `unit-tests.log`. ## How was this patch tested? Just check Jenkins output. Author: Shixiong ZhuCloses #11439 from zsxwing/quietly-ContinuousQueryManagerSuite. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ad0de99f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ad0de99f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ad0de99f Branch: refs/heads/master Commit: ad0de99f3d3167990d501297f1df069fe15e0678 Parents: 3edcc40 Author: Shixiong Zhu Authored: Thu Mar 3 15:41:56 2016 -0800 Committer: Shixiong Zhu Committed: Thu Mar 3 15:41:56 2016 -0800 -- sql/core/src/test/resources/log4j.properties | 1 + .../spark/sql/execution/ui/SQLListenerSuite.scala | 7 ++- .../sql/streaming/ContinuousQueryManagerSuite.scala | 8 .../org/apache/spark/sql/test/SQLTestUtils.scala | 14 ++ 4 files changed, 21 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ad0de99f/sql/core/src/test/resources/log4j.properties -- diff --git a/sql/core/src/test/resources/log4j.properties b/sql/core/src/test/resources/log4j.properties index 12fb128..e53cb1f 100644 --- a/sql/core/src/test/resources/log4j.properties +++ b/sql/core/src/test/resources/log4j.properties @@ -23,6 +23,7 @@ log4j.appender.CA=org.apache.log4j.ConsoleAppender log4j.appender.CA.layout=org.apache.log4j.PatternLayout log4j.appender.CA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %p %c: %m%n log4j.appender.CA.Threshold = WARN +log4j.appender.CA.follow = true #File Appender http://git-wip-us.apache.org/repos/asf/spark/blob/ad0de99f/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala index 085e4a4..4641a1a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala @@ -25,6 +25,7 @@ import org.apache.spark.{SparkConf, SparkContext, SparkException, SparkFunSuite} import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler._ import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.execution.{SparkPlanInfo, SQLExecution} import org.apache.spark.sql.execution.metric.{LongSQLMetricValue, SQLMetrics} import org.apache.spark.sql.test.SharedSQLContext @@ -376,9 +377,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { class SQLListenerMemoryLeakSuite extends SparkFunSuite { test("no memory leak") { -val oldLogLevel = org.apache.log4j.Logger.getRootLogger().getLevel() -try { - org.apache.log4j.Logger.getRootLogger().setLevel(org.apache.log4j.Level.FATAL) +quietly { val conf = new SparkConf() .setMaster("local") .setAppName("test") @@ -413,8 +412,6 @@ class SQLListenerMemoryLeakSuite extends SparkFunSuite { } finally { sc.stop() } -} finally { - org.apache.log4j.Logger.getRootLogger().setLevel(oldLogLevel) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/ad0de99f/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala index daf08ef..35bb9fd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala @@ -49,7 +49,7 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with sqlContext.streams.resetTerminated() } - test("listing") { + testQuietly("listing") { val (m1, ds1) = makeDataset val
spark git commit: [SPARK-13632][SQL] Move commands.scala to command package
Repository: spark Updated Branches: refs/heads/master 941b270b7 -> 3edcc4022 [SPARK-13632][SQL] Move commands.scala to command package ## What changes were proposed in this pull request? This patch simply moves things to a new package in an effort to reduce the size of the diff in #11048. Currently the new package only has one file, but in the future we'll add many new commands in SPARK-13139. ## How was this patch tested? Jenkins. Author: Andrew OrCloses #11482 from andrewor14/commands-package. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3edcc402 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3edcc402 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3edcc402 Branch: refs/heads/master Commit: 3edcc40223c8af12f64c2286420dc77817ab770e Parents: 941b270 Author: Andrew Or Authored: Thu Mar 3 15:24:38 2016 -0800 Committer: Reynold Xin Committed: Thu Mar 3 15:24:38 2016 -0800 -- .../scala/org/apache/spark/sql/DataFrame.scala | 3 +- .../scala/org/apache/spark/sql/SQLContext.scala | 1 + .../apache/spark/sql/execution/SparkQl.scala| 1 + .../spark/sql/execution/SparkStrategies.scala | 2 +- .../spark/sql/execution/command/commands.scala | 423 +++ .../apache/spark/sql/execution/commands.scala | 421 -- .../datasources/DataSourceStrategy.scala| 5 +- .../datasources/InsertIntoDataSource.scala | 2 +- .../InsertIntoHadoopFsRelation.scala| 3 +- .../spark/sql/execution/datasources/ddl.scala | 2 +- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 2 +- .../SparkExecuteStatementOperation.scala| 2 +- .../org/apache/spark/sql/hive/HiveContext.scala | 1 + .../apache/spark/sql/hive/HiveStrategies.scala | 3 +- .../hive/execution/CreateTableAsSelect.scala| 2 +- .../sql/hive/execution/CreateViewAsSelect.scala | 2 +- .../execution/DescribeHiveTableCommand.scala| 2 +- .../sql/hive/execution/HiveNativeCommand.scala | 2 +- .../spark/sql/hive/execution/commands.scala | 2 +- .../apache/spark/sql/hive/test/TestHive.scala | 2 +- .../sql/hive/execution/HiveComparisonTest.scala | 2 +- .../apache/spark/sql/hive/parquetSuites.scala | 3 +- 22 files changed, 449 insertions(+), 439 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3edcc402/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 844bc0f..339e61e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -36,8 +36,9 @@ import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.usePrettyExpression -import org.apache.spark.sql.execution.{ExplainCommand, FileRelation, LogicalRDD, Queryable, +import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, Queryable, QueryExecution, SQLExecution} +import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation} import org.apache.spark.sql.execution.datasources.json.JacksonGenerator import org.apache.spark.sql.execution.python.EvaluatePython http://git-wip-us.apache.org/repos/asf/spark/blob/3edcc402/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index cb4a639..39dad16 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -38,6 +38,7 @@ import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Range} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.command.ShowTablesCommand import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab} import org.apache.spark.sql.internal.{SessionState, SQLConf} http://git-wip-us.apache.org/repos/asf/spark/blob/3edcc402/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala
spark git commit: [MINOR] Fix typos in comments and testcase name of code
Repository: spark Updated Branches: refs/heads/master 52035d103 -> 941b270b7 [MINOR] Fix typos in comments and testcase name of code ## What changes were proposed in this pull request? This PR fixes typos in comments and testcase name of code. ## How was this patch tested? manual. Author: Dongjoon HyunCloses #11481 from dongjoon-hyun/minor_fix_typos_in_code. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/941b270b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/941b270b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/941b270b Branch: refs/heads/master Commit: 941b270b706d3b4aea73dbf102cfb6eee0beff63 Parents: 52035d1 Author: Dongjoon Hyun Authored: Thu Mar 3 22:42:12 2016 + Committer: Sean Owen Committed: Thu Mar 3 22:42:12 2016 + -- core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala | 2 +- core/src/test/scala/org/apache/sparktest/ImplicitSuite.scala | 4 ++-- dev/run-tests.py | 2 +- .../org/apache/spark/examples/ml/JavaDeveloperApiExample.java| 2 +- examples/src/main/python/mllib/naive_bayes_example.py| 2 +- examples/src/main/python/mllib/ranking_metrics_example.py| 2 +- examples/src/main/python/mllib/word2vec.py | 2 +- .../src/main/scala/org/apache/spark/examples/LocalFileLR.scala | 2 +- .../src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala | 2 +- examples/src/main/scala/org/apache/spark/examples/SparkLR.scala | 2 +- .../main/scala/org/apache/spark/examples/sql/RDDRelation.scala | 4 ++-- .../org/apache/spark/examples/streaming/TwitterPopularTags.scala | 2 +- graphx/src/test/scala/org/apache/spark/graphx/EdgeSuite.scala| 2 +- mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala | 2 +- project/SparkBuild.scala | 2 +- python/pyspark/mllib/fpm.py | 2 +- .../src/main/scala/org/apache/spark/repl/SparkIMain.scala| 4 ++-- .../src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala | 2 +- .../test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala | 2 +- 19 files changed, 22 insertions(+), 22 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/941b270b/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala b/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala index f157a45..fa078ee 100644 --- a/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala +++ b/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala @@ -28,7 +28,7 @@ import org.apache.spark._ * of them will be combined together, showed in one line. */ private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging { - // Carrige return + // Carriage return val CR = '\r' // Update period of progress bar, in milliseconds val UPDATE_PERIOD = 200L http://git-wip-us.apache.org/repos/asf/spark/blob/941b270b/core/src/test/scala/org/apache/sparktest/ImplicitSuite.scala -- diff --git a/core/src/test/scala/org/apache/sparktest/ImplicitSuite.scala b/core/src/test/scala/org/apache/sparktest/ImplicitSuite.scala index daa795a..2fb09ea 100644 --- a/core/src/test/scala/org/apache/sparktest/ImplicitSuite.scala +++ b/core/src/test/scala/org/apache/sparktest/ImplicitSuite.scala @@ -26,11 +26,11 @@ package org.apache.sparktest */ class ImplicitSuite { - // We only want to test if `implict` works well with the compiler, so we don't need a real + // We only want to test if `implicit` works well with the compiler, so we don't need a real // SparkContext. def mockSparkContext[T]: org.apache.spark.SparkContext = null - // We only want to test if `implict` works well with the compiler, so we don't need a real RDD. + // We only want to test if `implicit` works well with the compiler, so we don't need a real RDD. def mockRDD[T]: org.apache.spark.rdd.RDD[T] = null def testRddToPairRDDFunctions(): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/941b270b/dev/run-tests.py -- diff --git a/dev/run-tests.py b/dev/run-tests.py index b65d1a3..aa6af56 100755 --- a/dev/run-tests.py +++ b/dev/run-tests.py @@ -563,7 +563,7 @@ def main(): # backwards compatibility checks if build_tool == "sbt": -# Note: compatiblity tests only supported in sbt for now +# Note: compatibility tests only supported in sbt
spark git commit: [SPARK-13423][HOTFIX] Static analysis fixes for 2.x / fixed for Scala 2.10, again
Repository: spark Updated Branches: refs/heads/master ce58e99aa -> 52035d103 [SPARK-13423][HOTFIX] Static analysis fixes for 2.x / fixed for Scala 2.10, again ## What changes were proposed in this pull request? Fixes (another) compile problem due to inadvertent use of Option.contains, only in Scala 2.11 ## How was this patch tested? Jenkins tests Author: Sean OwenCloses #11496 from srowen/SPARK-13423.3. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/52035d10 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/52035d10 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/52035d10 Branch: refs/heads/master Commit: 52035d103661721a8f87c2f6788c6411f645a99d Parents: ce58e99 Author: Sean Owen Authored: Thu Mar 3 22:40:39 2016 + Committer: Sean Owen Committed: Thu Mar 3 22:40:39 2016 + -- .../scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/52035d10/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala index 935e280..b5385c1 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala @@ -243,7 +243,7 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local val resAfter = captor.getValue val resSizeBefore = resBefore.accumUpdates.find(_.name == Some(RESULT_SIZE)).flatMap(_.update) val resSizeAfter = resAfter.accumUpdates.find(_.name == Some(RESULT_SIZE)).flatMap(_.update) -assert(resSizeBefore.contains(0L)) +assert(resSizeBefore.exists(_ == 0L)) assert(resSizeAfter.exists(_.toString.toLong > 0L)) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [MINOR][ML][DOC] Remove duplicated periods at the end of some sharedParam
Repository: spark Updated Branches: refs/heads/master cf95d728c -> ce58e99aa [MINOR][ML][DOC] Remove duplicated periods at the end of some sharedParam ## What changes were proposed in this pull request? Remove duplicated periods at the end of some sharedParams in ScalaDoc, such as [here](https://github.com/apache/spark/pull/11344/files#diff-9edc669edcf2c0c7cf1efe4a0a57da80L367) cc mengxr srowen ## How was this patch tested? Documents change, no test. Author: Yanbo LiangCloses #11344 from yanboliang/shared-cleanup. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ce58e99a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ce58e99a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ce58e99a Branch: refs/heads/master Commit: ce58e99aae668d83cf69aa45993fb10a078394d9 Parents: cf95d72 Author: Yanbo Liang Authored: Thu Mar 3 13:36:54 2016 -0800 Committer: Joseph K. Bradley Committed: Thu Mar 3 13:36:54 2016 -0800 -- .../ml/param/shared/SharedParamsCodeGen.scala | 10 +- .../spark/ml/param/shared/sharedParams.scala| 20 ++-- 2 files changed, 15 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ce58e99a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala index 4aff749..3ce129b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala @@ -52,7 +52,7 @@ private[shared] object SharedParamsCodeGen { " to adjust the probability of predicting each class." + " Array must have length equal to the number of classes, with values >= 0." + " The class with largest value p/t is predicted, where p is the original probability" + -" of that class and t is the class' threshold.", +" of that class and t is the class' threshold", isValid = "(t: Array[Double]) => t.forall(_ >= 0)", finalMethods = false), ParamDesc[String]("inputCol", "input column name"), ParamDesc[Array[String]]("inputCols", "input column names"), @@ -63,7 +63,7 @@ private[shared] object SharedParamsCodeGen { ParamDesc[Boolean]("fitIntercept", "whether to fit an intercept term", Some("true")), ParamDesc[String]("handleInvalid", "how to handle invalid entries. Options are skip (which " + "will filter out rows with bad values), or error (which will throw an errror). More " + -"options may be added later.", +"options may be added later", isValid = "ParamValidators.inArray(Array(\"skip\", \"error\"))"), ParamDesc[Boolean]("standardization", "whether to standardize the training features" + " before fitting the model", Some("true")), @@ -72,11 +72,11 @@ private[shared] object SharedParamsCodeGen { " For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty", isValid = "ParamValidators.inRange(0, 1)"), ParamDesc[Double]("tol", "the convergence tolerance for iterative algorithms"), - ParamDesc[Double]("stepSize", "Step size to be used for each iteration of optimization."), + ParamDesc[Double]("stepSize", "Step size to be used for each iteration of optimization"), ParamDesc[String]("weightCol", "weight column name. If this is not set or empty, we treat " + -"all instance weights as 1.0."), +"all instance weights as 1.0"), ParamDesc[String]("solver", "the solver algorithm for optimization. If this is not set or " + -"empty, default value is 'auto'.", Some("\"auto\""))) +"empty, default value is 'auto'", Some("\"auto\""))) val code = genSharedParams(params) val file = "src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala" http://git-wip-us.apache.org/repos/asf/spark/blob/ce58e99a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala index c088c16..96263c5 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala @@ -176,10 +176,10 @@ private[ml] trait HasThreshold extends Params { private[ml] trait
spark git commit: [SPARK-13543][SQL] Support for specifying compression codec for Parquet/ORC via option()
Repository: spark Updated Branches: refs/heads/master 511d4929c -> cf95d728c [SPARK-13543][SQL] Support for specifying compression codec for Parquet/ORC via option() ## What changes were proposed in this pull request? This PR adds the support to specify compression codecs for both ORC and Parquet. ## How was this patch tested? unittests within IDE and code style tests with `dev/run_tests`. Author: hyukjinkwonCloses #11464 from HyukjinKwon/SPARK-13543. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cf95d728 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cf95d728 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cf95d728 Branch: refs/heads/master Commit: cf95d728c64f76e8b1065d7cacf1c3ad7769e935 Parents: 511d492 Author: hyukjinkwon Authored: Thu Mar 3 10:30:55 2016 -0800 Committer: Reynold Xin Committed: Thu Mar 3 10:30:55 2016 -0800 -- python/pyspark/sql/readwriter.py| 55 .../org/apache/spark/sql/DataFrameWriter.scala | 19 +-- .../datasources/CompressionCodecs.scala | 22 +--- .../datasources/parquet/ParquetRelation.scala | 16 +- .../execution/datasources/csv/CSVSuite.scala| 46 .../execution/datasources/json/JsonSuite.scala | 46 +++- .../execution/datasources/text/TextSuite.scala | 55 .../apache/spark/sql/hive/orc/OrcRelation.scala | 36 - .../sql/hive/orc/OrcHadoopFsRelationSuite.scala | 29 ++- .../sources/ParquetHadoopFsRelationSuite.scala | 20 +++ 10 files changed, 301 insertions(+), 43 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cf95d728/python/pyspark/sql/readwriter.py -- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 7f5368d..438662b 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -454,7 +454,7 @@ class DataFrameWriter(object): self._jwrite.saveAsTable(name) @since(1.4) -def json(self, path, mode=None): +def json(self, path, mode=None, compression=None): """Saves the content of the :class:`DataFrame` in JSON format at the specified path. :param path: the path in any Hadoop supported file system @@ -464,18 +464,19 @@ class DataFrameWriter(object): * ``overwrite``: Overwrite existing data. * ``ignore``: Silently ignore this operation if data already exists. * ``error`` (default case): Throw an exception if data already exists. - -You can set the following JSON-specific option(s) for writing JSON files: -* ``compression`` (default ``None``): compression codec to use when saving to file. -This can be one of the known case-insensitive shorten names -(``bzip2``, ``gzip``, ``lz4``, and ``snappy``). +:param compression: compression codec to use when saving to file. This can be one of the +known case-insensitive shorten names (none, bzip2, gzip, lz4, +snappy and deflate). >>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data')) """ -self.mode(mode)._jwrite.json(path) +self.mode(mode) +if compression is not None: +self.option("compression", compression) +self._jwrite.json(path) @since(1.4) -def parquet(self, path, mode=None, partitionBy=None): +def parquet(self, path, mode=None, partitionBy=None, compression=None): """Saves the content of the :class:`DataFrame` in Parquet format at the specified path. :param path: the path in any Hadoop supported file system @@ -486,32 +487,37 @@ class DataFrameWriter(object): * ``ignore``: Silently ignore this operation if data already exists. * ``error`` (default case): Throw an exception if data already exists. :param partitionBy: names of partitioning columns +:param compression: compression codec to use when saving to file. This can be one of the +known case-insensitive shorten names (none, snappy, gzip, and lzo). +This will overwrite ``spark.sql.parquet.compression.codec``. >>> df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data')) """ self.mode(mode) if partitionBy is not None: self.partitionBy(partitionBy) +if compression is not None: +self.option("compression", compression) self._jwrite.parquet(path) @since(1.6) -def text(self, path): +def text(self, path,
spark git commit: [SPARK-12877][ML] Add train-validation-split to pyspark
Repository: spark Updated Branches: refs/heads/master 9a48c656e -> 511d4929c [SPARK-12877][ML] Add train-validation-split to pyspark ## What changes were proposed in this pull request? The changes proposed were to add train-validation-split to pyspark.ml.tuning. ## How was the this patch tested? This patch was tested through unit tests located in pyspark/ml/test.py. This is my original work and I license it to Spark. Author: JeremyNixonCloses #11335 from JeremyNixon/tvs_pyspark. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/511d4929 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/511d4929 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/511d4929 Branch: refs/heads/master Commit: 511d4929c87ebf364b96bd46890628f736eaa026 Parents: 9a48c65 Author: JeremyNixon Authored: Thu Mar 3 09:50:05 2016 -0800 Committer: Xiangrui Meng Committed: Thu Mar 3 09:50:05 2016 -0800 -- python/pyspark/ml/tests.py | 53 ++- python/pyspark/ml/tuning.py | 193 ++- 2 files changed, 244 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/511d4929/python/pyspark/ml/tests.py -- diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 5fcfa9e..8182fcf 100644 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -45,7 +45,7 @@ from pyspark.ml.feature import * from pyspark.ml.param import Param, Params from pyspark.ml.param.shared import HasMaxIter, HasInputCol, HasSeed from pyspark.ml.regression import LinearRegression -from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel +from pyspark.ml.tuning import * from pyspark.ml.util import keyword_only from pyspark.mllib.linalg import DenseVector from pyspark.sql import DataFrame, SQLContext, Row @@ -423,6 +423,57 @@ class CrossValidatorTests(PySparkTestCase): self.assertEqual(1.0, bestModelMetric, "Best model has R-squared of 1") +class TrainValidationSplitTests(PySparkTestCase): + +def test_fit_minimize_metric(self): +sqlContext = SQLContext(self.sc) +dataset = sqlContext.createDataFrame([ +(10, 10.0), +(50, 50.0), +(100, 100.0), +(500, 500.0)] * 10, +["feature", "label"]) + +iee = InducedErrorEstimator() +evaluator = RegressionEvaluator(metricName="rmse") + +grid = (ParamGridBuilder() +.addGrid(iee.inducedError, [100.0, 0.0, 1.0]) +.build()) +tvs = TrainValidationSplit(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator) +tvsModel = tvs.fit(dataset) +bestModel = tvsModel.bestModel +bestModelMetric = evaluator.evaluate(bestModel.transform(dataset)) + +self.assertEqual(0.0, bestModel.getOrDefault('inducedError'), + "Best model should have zero induced error") +self.assertEqual(0.0, bestModelMetric, "Best model has RMSE of 0") + +def test_fit_maximize_metric(self): +sqlContext = SQLContext(self.sc) +dataset = sqlContext.createDataFrame([ +(10, 10.0), +(50, 50.0), +(100, 100.0), +(500, 500.0)] * 10, +["feature", "label"]) + +iee = InducedErrorEstimator() +evaluator = RegressionEvaluator(metricName="r2") + +grid = (ParamGridBuilder() +.addGrid(iee.inducedError, [100.0, 0.0, 1.0]) +.build()) +tvs = TrainValidationSplit(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator) +tvsModel = tvs.fit(dataset) +bestModel = tvsModel.bestModel +bestModelMetric = evaluator.evaluate(bestModel.transform(dataset)) + +self.assertEqual(0.0, bestModel.getOrDefault('inducedError'), + "Best model should have zero induced error") +self.assertEqual(1.0, bestModelMetric, "Best model has R-squared of 1") + + class PersistenceTest(PySparkTestCase): def test_linear_regression(self): http://git-wip-us.apache.org/repos/asf/spark/blob/511d4929/python/pyspark/ml/tuning.py -- diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py index 0cbe97f..77af009 100644 --- a/python/pyspark/ml/tuning.py +++ b/python/pyspark/ml/tuning.py @@ -25,7 +25,8 @@ from pyspark.ml.param.shared import HasSeed from pyspark.ml.util import keyword_only from pyspark.sql.functions import rand -__all__ = ['ParamGridBuilder', 'CrossValidator', 'CrossValidatorModel'] +__all__ = ['ParamGridBuilder',
spark git commit: [SPARK-13013][DOCS] Replace example code in mllib-clustering.md using include_example
Repository: spark Updated Branches: refs/heads/master 645c3a85e -> 70f6f9649 [SPARK-13013][DOCS] Replace example code in mllib-clustering.md using include_example Replace example code in mllib-clustering.md using include_example https://issues.apache.org/jira/browse/SPARK-13013 The example code in the user guide is embedded in the markdown and hence it is not easy to test. It would be nice to automatically test them. This JIRA is to discuss options to automate example code testing and see what we can do in Spark 1.6. Goal is to move actual example code to spark/examples and test compilation in Jenkins builds. Then in the markdown, we can reference part of the code to show in the user guide. This requires adding a Jekyll tag that is similar to https://github.com/jekyll/jekyll/blob/master/lib/jekyll/tags/include.rb, e.g., called include_example. `{% include_example scala/org/apache/spark/examples/mllib/KMeansExample.scala %}` Jekyll will find `examples/src/main/scala/org/apache/spark/examples/mllib/KMeansExample.scala` and pick code blocks marked "example" and replace code block in `{% highlight %}` in the markdown. See more sub-tasks in parent ticket: https://issues.apache.org/jira/browse/SPARK-11337 Author: Xin RenCloses #6 from keypointt/SPARK-13013. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/70f6f964 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/70f6f964 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/70f6f964 Branch: refs/heads/master Commit: 70f6f9649bdb13b6745473b7edc4cd06b10f99d2 Parents: 645c3a8 Author: Xin Ren Authored: Thu Mar 3 09:32:47 2016 -0800 Committer: Xiangrui Meng Committed: Thu Mar 3 09:32:47 2016 -0800 -- data/mllib/streaming_kmeans_data_test.txt | 2 + docs/mllib-clustering.md| 460 +-- .../mllib/JavaGaussianMixtureExample.java | 72 +++ .../spark/examples/mllib/JavaKMeansExample.java | 72 +++ .../JavaLatentDirichletAllocationExample.java | 93 .../JavaPowerIterationClusteringExample.java| 4 + .../python/mllib/gaussian_mixture_example.py| 51 ++ .../src/main/python/mllib/k_means_example.py| 55 +++ .../latent_dirichlet_allocation_example.py | 54 +++ .../mllib/power_iteration_clustering_example.py | 44 ++ .../python/mllib/streaming_k_means_example.py | 66 +++ .../examples/mllib/GaussianMixtureExample.scala | 57 +++ .../spark/examples/mllib/KMeansExample.scala| 56 +++ .../LatentDirichletAllocationExample.scala | 62 +++ .../mllib/PowerIterationClusteringExample.scala | 8 +- .../examples/mllib/StreamingKMeansExample.scala | 6 +- 16 files changed, 715 insertions(+), 447 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/70f6f964/data/mllib/streaming_kmeans_data_test.txt -- diff --git a/data/mllib/streaming_kmeans_data_test.txt b/data/mllib/streaming_kmeans_data_test.txt new file mode 100644 index 000..649a0d6 --- /dev/null +++ b/data/mllib/streaming_kmeans_data_test.txt @@ -0,0 +1,2 @@ +(1.0), [1.7, 0.4, 0.9] +(2.0), [2.2, 1.8, 0.0] http://git-wip-us.apache.org/repos/asf/spark/blob/70f6f964/docs/mllib-clustering.md -- diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md index 8e724fb..4472014 100644 --- a/docs/mllib-clustering.md +++ b/docs/mllib-clustering.md @@ -49,27 +49,7 @@ optimal *k* is usually one where there is an "elbow" in the WSSSE graph. Refer to the [`KMeans` Scala docs](api/scala/index.html#org.apache.spark.mllib.clustering.KMeans) and [`KMeansModel` Scala docs](api/scala/index.html#org.apache.spark.mllib.clustering.KMeansModel) for details on the API. -{% highlight scala %} -import org.apache.spark.mllib.clustering.{KMeans, KMeansModel} -import org.apache.spark.mllib.linalg.Vectors - -// Load and parse the data -val data = sc.textFile("data/mllib/kmeans_data.txt") -val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache() - -// Cluster the data into two classes using KMeans -val numClusters = 2 -val numIterations = 20 -val clusters = KMeans.train(parsedData, numClusters, numIterations) - -// Evaluate clustering by computing Within Set Sum of Squared Errors -val WSSSE = clusters.computeCost(parsedData) -println("Within Set Sum of Squared Errors = " + WSSSE) - -// Save and load model -clusters.save(sc, "myModelPath") -val sameModel = KMeansModel.load(sc, "myModelPath") -{% endhighlight %} +{% include_example scala/org/apache/spark/examples/mllib/KMeansExample.scala %} @@ -81,51 +61,7 @@ that is equivalent to the provided example in
spark git commit: [SPARK-13465] Add a task failure listener to TaskContext
Repository: spark Updated Branches: refs/heads/branch-1.6 fedb81360 -> 1ce2c1235 [SPARK-13465] Add a task failure listener to TaskContext ## What changes were proposed in this pull request? TaskContext supports task completion callback, which gets called regardless of task failures. However, there is no way for the listener to know if there is an error. This patch adds a new listener that gets called when a task fails. ## How was this patch tested? New unit test case and integration test case covering the code path Author: Davies LiuCloses #11478 from davies/add_failure_1.6. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1ce2c123 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1ce2c123 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1ce2c123 Branch: refs/heads/branch-1.6 Commit: 1ce2c123565630914cdcafe6549582843047d874 Parents: fedb813 Author: Davies Liu Authored: Thu Mar 3 08:43:38 2016 -0800 Committer: Davies Liu Committed: Thu Mar 3 08:43:38 2016 -0800 -- .../scala/org/apache/spark/TaskContext.scala| 28 +++- .../org/apache/spark/TaskContextImpl.scala | 33 -- .../scala/org/apache/spark/scheduler/Task.scala | 5 ++ .../spark/util/TaskCompletionListener.scala | 33 -- .../util/TaskCompletionListenerException.scala | 34 -- .../org/apache/spark/util/taskListeners.scala | 68 .../spark/JavaTaskCompletionListenerImpl.java | 39 --- .../spark/JavaTaskContextCompileCheck.java | 30 + .../spark/scheduler/TaskContextSuite.scala | 52 +-- project/MimaExcludes.scala | 4 +- 10 files changed, 205 insertions(+), 121 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1ce2c123/core/src/main/scala/org/apache/spark/TaskContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index af558d6..190c8ea 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -23,7 +23,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.executor.TaskMetrics import org.apache.spark.memory.TaskMemoryManager import org.apache.spark.metrics.source.Source -import org.apache.spark.util.TaskCompletionListener +import org.apache.spark.util.{TaskCompletionListener, TaskFailureListener} object TaskContext { @@ -108,6 +108,8 @@ abstract class TaskContext extends Serializable { * Adds a (Java friendly) listener to be executed on task completion. * This will be called in all situation - success, failure, or cancellation. * An example use is for HadoopRDD to register a callback to close the input stream. + * + * Exceptions thrown by the listener will result in failure of the task. */ def addTaskCompletionListener(listener: TaskCompletionListener): TaskContext @@ -115,8 +117,30 @@ abstract class TaskContext extends Serializable { * Adds a listener in the form of a Scala closure to be executed on task completion. * This will be called in all situations - success, failure, or cancellation. * An example use is for HadoopRDD to register a callback to close the input stream. + * + * Exceptions thrown by the listener will result in failure of the task. + */ + def addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext = { +addTaskCompletionListener(new TaskCompletionListener { + override def onTaskCompletion(context: TaskContext): Unit = f(context) +}) + } + + /** + * Adds a listener to be executed on task failure. + * Operations defined here must be idempotent, as `onTaskFailure` can be called multiple times. */ - def addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext + def addTaskFailureListener(listener: TaskFailureListener): TaskContext + + /** + * Adds a listener to be executed on task failure. + * Operations defined here must be idempotent, as `onTaskFailure` can be called multiple times. + */ + def addTaskFailureListener(f: (TaskContext, Throwable) => Unit): TaskContext = { +addTaskFailureListener(new TaskFailureListener { + override def onTaskFailure(context: TaskContext, error: Throwable): Unit = f(context, error) +}) + } /** * Adds a callback function to be executed on task completion. An example use http://git-wip-us.apache.org/repos/asf/spark/blob/1ce2c123/core/src/main/scala/org/apache/spark/TaskContextImpl.scala -- diff --git
spark git commit: [SPARK-13423][HOTFIX] Static analysis fixes for 2.x / fixed for Scala 2.10
Repository: spark Updated Branches: refs/heads/master b5f02d674 -> 645c3a85e [SPARK-13423][HOTFIX] Static analysis fixes for 2.x / fixed for Scala 2.10 ## What changes were proposed in this pull request? Fixes compile problem due to inadvertent use of `Option.contains`, only in Scala 2.11. The change should have been to replace `Option.exists(_ == x)` with `== Some(x)`. Replacing exists with contains only makes sense for collections. Replacing use of `Option.exists` still makes sense though as it's misleading. ## How was this patch tested? Jenkins tests / compilation (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Author: Sean OwenCloses #11493 from srowen/SPARK-13423.2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/645c3a85 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/645c3a85 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/645c3a85 Branch: refs/heads/master Commit: 645c3a85e2029928d37ec2de9ef5a2d884620b9b Parents: b5f02d67 Author: Sean Owen Authored: Thu Mar 3 15:11:02 2016 + Committer: Sean Owen Committed: Thu Mar 3 15:11:02 2016 + -- core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala | 2 +- core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala | 2 +- .../main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/645c3a85/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala index 2fd630a..0d0e9b0 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala @@ -29,7 +29,7 @@ private[ui] class JobsTab(parent: SparkUI) extends SparkUITab(parent, "jobs") { val operationGraphListener = parent.operationGraphListener def isFairScheduler: Boolean = -jobProgresslistener.schedulingMode.contains(SchedulingMode.FAIR) +jobProgresslistener.schedulingMode == Some(SchedulingMode.FAIR) attachPage(new AllJobsPage(this)) attachPage(new JobPage(this)) http://git-wip-us.apache.org/repos/asf/spark/blob/645c3a85/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala index ece5d0f..bd5f16d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala @@ -34,7 +34,7 @@ private[ui] class StagesTab(parent: SparkUI) extends SparkUITab(parent, "stages" attachPage(new StagePage(this)) attachPage(new PoolPage(this)) - def isFairScheduler: Boolean = progressListener.schedulingMode.contains(SchedulingMode.FAIR) + def isFairScheduler: Boolean = progressListener.schedulingMode == Some(SchedulingMode.FAIR) def handleKillRequest(request: HttpServletRequest): Unit = { if (killEnabled && parent.securityManager.checkModifyPermissions(request.getRemoteUser)) { http://git-wip-us.apache.org/repos/asf/spark/blob/645c3a85/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index f9d1029..41762fc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -335,7 +335,7 @@ private[spark] class SQLHistoryListener(conf: SparkConf, sparkUI: SparkUI) taskEnd.taskInfo.accumulables.flatMap { a => // Filter out accumulators that are not SQL metrics // For now we assume all SQL metrics are Long's that have been JSON serialized as String's -if (a.metadata.contains(SQLMetrics.ACCUM_IDENTIFIER)) { +if (a.metadata == Some(SQLMetrics.ACCUM_IDENTIFIER)) { val newValue = new LongSQLMetricValue(a.update.map(_.toString.toLong).getOrElse(0L)) Some(a.copy(update = Some(newValue))) } else { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[2/3] spark git commit: [SPARK-13583][CORE][STREAMING] Remove unused imports and add checkstyle rule
http://git-wip-us.apache.org/repos/asf/spark/blob/b5f02d67/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala index 9f7c7d5..2770b8a 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala @@ -19,7 +19,6 @@ package org.apache.spark.examples.streaming import scala.collection.mutable.LinkedHashSet -import scala.reflect.ClassTag import scala.util.Random import akka.actor._ http://git-wip-us.apache.org/repos/asf/spark/blob/b5f02d67/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala index ad13d43..5ce5778 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala @@ -18,7 +18,7 @@ // scalastyle:off println package org.apache.spark.examples.streaming -import java.io.{BufferedReader, InputStream, InputStreamReader} +import java.io.{BufferedReader, InputStreamReader} import java.net.Socket import org.apache.spark.{Logging, SparkConf} http://git-wip-us.apache.org/repos/asf/spark/blob/b5f02d67/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala index fe3b79e..dd725d7 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala @@ -18,10 +18,7 @@ // scalastyle:off println package org.apache.spark.examples.streaming -import java.net.InetSocketAddress - import org.apache.spark.SparkConf -import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ import org.apache.spark.streaming.flume._ import org.apache.spark.util.IntParam http://git-wip-us.apache.org/repos/asf/spark/blob/b5f02d67/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala index 9aa0f54..3727f8f 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala @@ -24,7 +24,6 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext, Time} -import org.apache.spark.util.IntParam /** * Use DataFrames and SQL to count words in UTF8 encoded, '\n' delimited text received from the http://git-wip-us.apache.org/repos/asf/spark/blob/b5f02d67/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala index c85d684..2811e67 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala @@ -18,7 +18,6 @@ // scalastyle:off println package org.apache.spark.examples.streaming -import org.apache.spark.HashPartitioner import org.apache.spark.SparkConf import org.apache.spark.streaming._ http://git-wip-us.apache.org/repos/asf/spark/blob/b5f02d67/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala index 825c671..5af82e1 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala +++
[3/3] spark git commit: [SPARK-13583][CORE][STREAMING] Remove unused imports and add checkstyle rule
[SPARK-13583][CORE][STREAMING] Remove unused imports and add checkstyle rule ## What changes were proposed in this pull request? After SPARK-6990, `dev/lint-java` keeps Java code healthy and helps PR review by saving much time. This issue aims remove unused imports from Java/Scala code and add `UnusedImports` checkstyle rule to help developers. ## How was this patch tested? ``` ./dev/lint-java ./build/sbt compile ``` Author: Dongjoon HyunCloses #11438 from dongjoon-hyun/SPARK-13583. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b5f02d67 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b5f02d67 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b5f02d67 Branch: refs/heads/master Commit: b5f02d6743ecb1633b7b13382f76cb8bfc2aa95c Parents: e97fc7f Author: Dongjoon Hyun Authored: Thu Mar 3 10:12:32 2016 + Committer: Sean Owen Committed: Thu Mar 3 10:12:32 2016 + -- checkstyle.xml| 1 + .../java/org/apache/spark/network/protocol/OneWayMessage.java | 1 - .../java/org/apache/spark/network/protocol/RpcRequest.java| 1 - .../java/org/apache/spark/network/protocol/RpcResponse.java | 1 - .../java/org/apache/spark/network/protocol/StreamFailure.java | 3 --- .../java/org/apache/spark/network/protocol/StreamRequest.java | 3 --- .../org/apache/spark/network/protocol/StreamResponse.java | 1 - .../java/org/apache/spark/network/sasl/SaslEncryption.java| 1 - .../apache/spark/network/server/TransportRequestHandler.java | 1 - .../main/java/org/apache/spark/network/util/NettyUtils.java | 2 -- .../org/apache/spark/network/util/TransportFrameDecoder.java | 1 - .../org/apache/spark/network/sasl/ShuffleSecretManager.java | 1 - .../spark/network/shuffle/ExternalShuffleSecuritySuite.java | 1 - .../org/apache/spark/shuffle/sort/PackedRecordPointer.java| 4 +--- .../org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java| 1 - .../spark/util/collection/unsafe/sort/PrefixComparators.java | 1 - .../collection/unsafe/sort/RecordPointerAndKeyPrefix.java | 4 +--- .../scala/org/apache/spark/ExecutorAllocationManager.scala| 2 +- core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala | 1 - core/src/main/scala/org/apache/spark/Partitioner.scala| 4 ++-- core/src/main/scala/org/apache/spark/SparkContext.scala | 1 - .../scala/org/apache/spark/TaskNotSerializableException.scala | 2 -- core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala | 3 --- .../scala/org/apache/spark/broadcast/BroadcastFactory.scala | 1 - .../main/scala/org/apache/spark/deploy/RPackageUtils.scala| 2 +- .../scala/org/apache/spark/deploy/master/DriverInfo.scala | 1 - .../main/scala/org/apache/spark/deploy/worker/Worker.scala| 2 +- .../scala/org/apache/spark/deploy/worker/ui/LogPage.scala | 1 - .../main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala | 1 - core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala| 2 +- .../main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala | 2 +- .../org/apache/spark/rdd/ReliableRDDCheckpointData.scala | 1 - .../main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala | 1 - .../src/main/scala/org/apache/spark/scheduler/ActiveJob.scala | 1 - .../scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala | 2 -- .../main/scala/org/apache/spark/scheduler/TaskScheduler.scala | 1 - .../spark/scheduler/cluster/CoarseGrainedClusterMessage.scala | 2 +- .../main/scala/org/apache/spark/serializer/Serializer.scala | 4 ++-- .../scala/org/apache/spark/shuffle/BaseShuffleHandle.scala| 3 +-- .../scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala | 2 -- .../org/apache/spark/shuffle/hash/HashShuffleWriter.scala | 1 - .../org/apache/spark/shuffle/sort/SortShuffleWriter.scala | 1 - .../scala/org/apache/spark/storage/BlockManagerMaster.scala | 2 +- core/src/main/scala/org/apache/spark/storage/DiskStore.scala | 1 - core/src/main/scala/org/apache/spark/ui/JettyUtils.scala | 2 +- .../org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala | 1 - core/src/main/scala/org/apache/spark/util/SizeEstimator.scala | 1 - .../org/apache/spark/util/collection/AppendOnlyMap.scala | 2 +- .../java/org/apache/spark/launcher/SparkLauncherSuite.java| 3 --- .../org/apache/spark/serializer/TestJavaSerializerImpl.java | 1 - .../test/scala/org/apache/spark/MapOutputTrackerSuite.scala | 2 +- core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala| 3 --- core/src/test/scala/org/apache/spark/ThreadingSuite.scala | 4 +--- .../org/apache/spark/deploy/LogUrlsStandaloneSuite.scala | 2 +- .../scala/org/apache/spark/deploy/client/AppClientSuite.scala | 1 -
[3/3] spark git commit: [SPARK-13423][WIP][CORE][SQL][STREAMING] Static analysis fixes for 2.x
[SPARK-13423][WIP][CORE][SQL][STREAMING] Static analysis fixes for 2.x ## What changes were proposed in this pull request? Make some cross-cutting code improvements according to static analysis. These are individually up for discussion since they exist in separate commits that can be reverted. The changes are broadly: - Inner class should be static - Mismatched hashCode/equals - Overflow in compareTo - Unchecked warnings - Misuse of assert, vs junit.assert - get(a) + getOrElse(b) -> getOrElse(a,b) - Array/String .size -> .length (occasionally, -> .isEmpty / .nonEmpty) to avoid implicit conversions - Dead code - tailrec - exists(_ == ) -> contains find + nonEmpty -> exists filter + size -> count - reduce(_+_) -> sum map + flatten -> map The most controversial may be .size -> .length simply because of its size. It is intended to avoid implicits that might be expensive in some places. ## How was the this patch tested? Existing Jenkins unit tests. Author: Sean OwenCloses #11292 from srowen/SPARK-13423. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e97fc7f1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e97fc7f1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e97fc7f1 Branch: refs/heads/master Commit: e97fc7f176f8bf501c9b3afd8410014e3b0e1602 Parents: 02b7677 Author: Sean Owen Authored: Thu Mar 3 09:54:09 2016 + Committer: Sean Owen Committed: Thu Mar 3 09:54:09 2016 + -- .../network/ChunkFetchIntegrationSuite.java | 2 +- .../network/RequestTimeoutIntegrationSuite.java | 12 +++ .../spark/network/RpcIntegrationSuite.java | 2 +- .../network/TransportClientFactorySuite.java| 7 ++-- .../protocol/MessageWithHeaderSuite.java| 6 ++-- .../shuffle/protocol/mesos/RegisterDriver.java | 8 + .../ExternalShuffleIntegrationSuite.java| 2 +- .../shuffle/RetryingBlockFetcherSuite.java | 2 +- .../types/UTF8StringPropertyCheckSuite.scala| 2 +- .../shuffle/sort/ShuffleInMemorySorter.java | 4 ++- .../unsafe/sort/UnsafeExternalSorter.java | 2 +- .../scala/org/apache/spark/Dependency.scala | 2 +- .../scala/org/apache/spark/Partitioner.scala| 8 ++--- .../scala/org/apache/spark/SparkContext.scala | 7 ++-- .../scala/org/apache/spark/TaskEndReason.scala | 1 - .../apache/spark/deploy/ClientArguments.scala | 2 ++ .../org/apache/spark/deploy/SparkSubmit.scala | 3 ++ .../deploy/history/HistoryServerArguments.scala | 3 ++ .../spark/deploy/master/MasterArguments.scala | 3 ++ .../spark/deploy/master/MasterSource.scala | 4 +-- .../master/ZooKeeperPersistenceEngine.scala | 2 +- .../spark/deploy/master/ui/MasterPage.scala | 10 +++--- .../mesos/MesosClusterDispatcherArguments.scala | 3 ++ .../deploy/rest/RestSubmissionClient.scala | 6 ++-- .../deploy/rest/mesos/MesosRestServer.scala | 2 +- .../org/apache/spark/deploy/worker/Worker.scala | 7 ++-- .../spark/deploy/worker/WorkerArguments.scala | 3 ++ .../partial/ApproximateActionListener.scala | 2 +- .../apache/spark/rdd/LocalCheckpointRDD.scala | 2 +- .../apache/spark/scheduler/DAGScheduler.scala | 2 ++ .../spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../apache/spark/scheduler/TaskSetManager.scala | 4 +-- .../spark/serializer/JavaSerializer.scala | 2 +- .../spark/status/api/v1/AllRDDResource.scala| 2 +- .../spark/status/api/v1/OneJobResource.scala| 2 +- .../org/apache/spark/storage/StorageUtils.scala | 6 ++-- .../apache/spark/ui/ConsoleProgressBar.scala| 2 +- .../org/apache/spark/ui/jobs/JobsTab.scala | 2 +- .../org/apache/spark/ui/jobs/StagesTab.scala| 2 +- .../ui/scope/RDDOperationGraphListener.scala| 5 ++- .../apache/spark/ui/storage/StoragePage.scala | 2 +- .../scala/org/apache/spark/util/Utils.scala | 2 ++ .../util/logging/RollingFileAppender.scala | 2 +- .../spark/memory/TaskMemoryManagerSuite.java| 36 ++-- .../sort/ShuffleInMemorySorterSuite.java| 2 +- .../shuffle/sort/UnsafeShuffleWriterSuite.java | 6 ++-- .../unsafe/sort/UnsafeExternalSorterSuite.java | 22 ++-- .../unsafe/sort/UnsafeInMemorySorterSuite.java | 3 +- .../org/apache/spark/AccumulatorSuite.scala | 2 +- .../scala/org/apache/spark/SparkConfSuite.scala | 2 +- .../rdd/ParallelCollectionSplitSuite.scala | 12 +++ .../scala/org/apache/spark/rdd/RDDSuite.scala | 6 ++-- .../apache/spark/scheduler/MapStatusSuite.scala | 2 +- .../spark/scheduler/TaskResultGetterSuite.scala | 2 +- .../BypassMergeSortShuffleWriterSuite.scala | 2 +- .../apache/spark/util/SparkConfWithEnv.scala| 4 +-- .../examples/streaming/JavaActorWordCount.java | 4 ++- .../spark/examples/DFSReadWriteTest.scala
[1/3] spark git commit: [SPARK-13423][WIP][CORE][SQL][STREAMING] Static analysis fixes for 2.x
Repository: spark Updated Branches: refs/heads/master 02b7677e9 -> e97fc7f17 http://git-wip-us.apache.org/repos/asf/spark/blob/e97fc7f1/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 145b5f7..a89ed48 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -178,7 +178,7 @@ case class InsertIntoHiveTable( // loadPartition call orders directories created on the iteration order of the this map val orderedPartitionSpec = new util.LinkedHashMap[String, String]() table.hiveQlTable.getPartCols.asScala.foreach { entry => -orderedPartitionSpec.put(entry.getName, partitionSpec.get(entry.getName).getOrElse("")) +orderedPartitionSpec.put(entry.getName, partitionSpec.getOrElse(entry.getName, "")) } // inheritTableSpecs is set to true. It should be set to false for a IMPORT query http://git-wip-us.apache.org/repos/asf/spark/blob/e97fc7f1/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index eedb42c..d397688 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -160,7 +160,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { require(batchDuration != null, "Batch duration has not been set") // assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration + // " is very low") - require(getOutputStreams().size > 0, "No output operations registered, so nothing to execute") + require(getOutputStreams().nonEmpty, "No output operations registered, so nothing to execute") } } http://git-wip-us.apache.org/repos/asf/spark/blob/e97fc7f1/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index a25dada..7fba2e8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -276,7 +276,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( config) case None => context.sparkContext.newAPIHadoopFile[K, V, F](file) } - if (rdd.partitions.size == 0) { + if (rdd.partitions.isEmpty) { logError("File " + file + " has no data in it. Spark Streaming can only ingest " + "files that have been \"moved\" to the directory assigned to the file stream. " + "Refer to the streaming programming guide for more details.") http://git-wip-us.apache.org/repos/asf/spark/blob/e97fc7f1/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala index 49d8f14..fd3e72e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala @@ -108,7 +108,7 @@ abstract class ReceiverInputDStream[T: ClassTag](_ssc: StreamingContext) } else { // Else, create a BlockRDD. However, if there are some blocks with WAL info but not // others then that is unexpected and log a warning accordingly. -if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) { +if (blockInfos.exists(_.walRecordHandleOption.nonEmpty)) { if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) { logError("Some blocks do not have Write Ahead Log information; " + "this is unexpected and data may not be recoverable after driver failures") @@ -119,7 +119,7 @@ abstract class ReceiverInputDStream[T: ClassTag](_ssc: StreamingContext) val validBlockIds = blockIds.filter { id => ssc.sparkContext.env.blockManager.master.contains(id) } -if
[2/3] spark git commit: [SPARK-13423][WIP][CORE][SQL][STREAMING] Static analysis fixes for 2.x
http://git-wip-us.apache.org/repos/asf/spark/blob/e97fc7f1/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala index 4b43550..773a2e5 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala @@ -69,7 +69,7 @@ object PageViewStream { .groupByKey() val errorRatePerZipCode = statusesPerZipCode.map{ case(zip, statuses) => -val normalCount = statuses.filter(_ == 200).size +val normalCount = statuses.count(_ == 200) val errorCount = statuses.size - normalCount val errorRatio = errorCount.toFloat / statuses.size if (errorRatio > 0.05) { http://git-wip-us.apache.org/repos/asf/spark/blob/e97fc7f1/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala -- diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala index 4eb1556..475167a 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala @@ -79,7 +79,7 @@ class KafkaRDD[ .map(_.asInstanceOf[KafkaRDDPartition]) .filter(_.count > 0) -if (num < 1 || nonEmptyPartitions.size < 1) { +if (num < 1 || nonEmptyPartitions.isEmpty) { return new Array[R](0) } http://git-wip-us.apache.org/repos/asf/spark/blob/e97fc7f1/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index 3e8c385..87f3bc3 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -284,7 +284,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali if (selectedVertices.count > 1) { found = true val collectedVertices = selectedVertices.collect() -retVal = collectedVertices(Random.nextInt(collectedVertices.size)) +retVal = collectedVertices(Random.nextInt(collectedVertices.length)) } } retVal http://git-wip-us.apache.org/repos/asf/spark/blob/e97fc7f1/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala index 53a9f92..5a0c479 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala @@ -276,7 +276,7 @@ object VertexRDD { def apply[VD: ClassTag](vertices: RDD[(VertexId, VD)]): VertexRDD[VD] = { val vPartitioned: RDD[(VertexId, VD)] = vertices.partitioner match { case Some(p) => vertices - case None => vertices.partitionBy(new HashPartitioner(vertices.partitions.size)) + case None => vertices.partitionBy(new HashPartitioner(vertices.partitions.length)) } val vertexPartitions = vPartitioned.mapPartitions( iter => Iterator(ShippableVertexPartition(iter)), @@ -317,7 +317,7 @@ object VertexRDD { ): VertexRDD[VD] = { val vPartitioned: RDD[(VertexId, VD)] = vertices.partitioner match { case Some(p) => vertices - case None => vertices.partitionBy(new HashPartitioner(vertices.partitions.size)) + case None => vertices.partitionBy(new HashPartitioner(vertices.partitions.length)) } val routingTables = createRoutingTables(edges, vPartitioned.partitioner.get) val vertexPartitions = vPartitioned.zipPartitions(routingTables, preservesPartitioning = true) { @@ -358,7 +358,7 @@ object VertexRDD { Function.tupled(RoutingTablePartition.edgePartitionToMsgs))) .setName("VertexRDD.createRoutingTables - vid2pid (aggregation)") -val numEdgePartitions = edges.partitions.size +val numEdgePartitions = edges.partitions.length vid2pid.partitionBy(vertexPartitioner).mapPartitions( iter => Iterator(RoutingTablePartition.fromMsgs(numEdgePartitions, iter)), preservesPartitioning = true) http://git-wip-us.apache.org/repos/asf/spark/blob/e97fc7f1/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
spark git commit: [HOT-FIX] Recover some deprecations for 2.10 compatibility.
Repository: spark Updated Branches: refs/heads/master 7b25dc7b7 -> 02b7677e9 [HOT-FIX] Recover some deprecations for 2.10 compatibility. ## What changes were proposed in this pull request? #11479 [SPARK-13627] broke 2.10 compatibility: [2.10-Build](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Compile/job/spark-master-compile-maven-scala-2.10/292/console) At this moment, we need to support both 2.10 and 2.11. This PR recovers some deprecated methods which were replace by [SPARK-13627]. ## How was this patch tested? Jenkins build: Both 2.10, 2.11. Author: Dongjoon HyunCloses #11488 from dongjoon-hyun/hotfix_compatibility_with_2.10. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/02b7677e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/02b7677e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/02b7677e Branch: refs/heads/master Commit: 02b7677e9584f5ccd68869abdb0bf980dc847ce1 Parents: 7b25dc7 Author: Dongjoon Hyun Authored: Thu Mar 3 09:53:02 2016 + Committer: Sean Owen Committed: Thu Mar 3 09:53:02 2016 + -- .../scala/org/apache/spark/examples/mllib/AbstractParams.scala | 2 +- .../scala/org/apache/spark/sql/catalyst/ScalaReflection.scala | 6 +++--- .../spark/sql/catalyst/expressions/codegen/package.scala | 2 +- .../main/scala/org/apache/spark/tools/GenerateMIMAIgnore.scala | 4 ++-- 4 files changed, 7 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/02b7677e/examples/src/main/scala/org/apache/spark/examples/mllib/AbstractParams.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/AbstractParams.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/AbstractParams.scala index 8985c85..ae60577 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/AbstractParams.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/AbstractParams.scala @@ -38,7 +38,7 @@ abstract class AbstractParams[T: TypeTag] { */ override def toString: String = { val tpe = tag.tpe -val allAccessors = tpe.decls.collect { +val allAccessors = tpe.declarations.collect { case m: MethodSymbol if m.isCaseAccessor => m } val mirror = runtimeMirror(getClass.getClassLoader) http://git-wip-us.apache.org/repos/asf/spark/blob/02b7677e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index 4f1911c..02cb2d9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -774,9 +774,9 @@ trait ScalaReflection { } protected def constructParams(tpe: Type): Seq[Symbol] = { -val constructorSymbol = tpe.member(termNames.CONSTRUCTOR) +val constructorSymbol = tpe.member(nme.CONSTRUCTOR) val params = if (constructorSymbol.isMethod) { - constructorSymbol.asMethod.paramLists + constructorSymbol.asMethod.paramss } else { // Find the primary constructor, and use its parameter ordering. val primaryConstructorSymbol: Option[Symbol] = constructorSymbol.asTerm.alternatives.find( @@ -784,7 +784,7 @@ trait ScalaReflection { if (primaryConstructorSymbol.isEmpty) { sys.error("Internal SQL error: Product object did not have a primary constructor.") } else { -primaryConstructorSymbol.get.asMethod.paramLists +primaryConstructorSymbol.get.asMethod.paramss } } params.flatten http://git-wip-us.apache.org/repos/asf/spark/blob/02b7677e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/package.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/package.scala index 382c718..41128fe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/package.scala @@ -51,7 +51,7 @@ package object codegen { val classLoader = generatedClass .getClassLoader - .asInstanceOf[scala.reflect.internal.util.AbstractFileClassLoader] +
spark git commit: [SPARK-13466] [SQL] Remove projects that become redundant after column pruning rule
Repository: spark Updated Branches: refs/heads/master 1085bd862 -> 7b25dc7b7 [SPARK-13466] [SQL] Remove projects that become redundant after column pruning rule JIRA: https://issues.apache.org/jira/browse/SPARK-13466 ## What changes were proposed in this pull request? With column pruning rule in optimizer, some Project operators will become redundant. We should remove these redundant Projects. For an example query: val input = LocalRelation('key.int, 'value.string) val query = Project(Seq($"x.key", $"y.key"), Join( SubqueryAlias("x", input), BroadcastHint(SubqueryAlias("y", input)), Inner, None)) After the first run of column pruning, it would like: Project(Seq($"x.key", $"y.key"), Join( Project(Seq($"x.key"), SubqueryAlias("x", input)), Project(Seq($"y.key"), <-- inserted by the rule BroadcastHint(SubqueryAlias("y", input))), Inner, None)) Actually we don't need the outside Project now. This patch will remove it: Join( Project(Seq($"x.key"), SubqueryAlias("x", input)), Project(Seq($"y.key"), BroadcastHint(SubqueryAlias("y", input))), Inner, None) ## How was the this patch tested? Unit test is added into ColumnPruningSuite. Author: Liang-Chi HsiehCloses #11341 from viirya/remove-redundant-project. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7b25dc7b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7b25dc7b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7b25dc7b Branch: refs/heads/master Commit: 7b25dc7b7e5a098552c0d640eee132b83d42db56 Parents: 1085bd8 Author: Liang-Chi Hsieh Authored: Thu Mar 3 00:06:46 2016 -0800 Committer: Davies Liu Committed: Thu Mar 3 00:06:46 2016 -0800 -- .../sql/catalyst/optimizer/Optimizer.scala | 6 - .../catalyst/optimizer/ColumnPruningSuite.scala | 23 +++- .../optimizer/JoinOptimizationSuite.scala | 9 .../sql/execution/metric/SQLMetricsSuite.scala | 8 +++ 4 files changed, 35 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7b25dc7b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 83ea302..059d8ff 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -312,6 +312,10 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper { * - LeftSemiJoin */ object ColumnPruning extends Rule[LogicalPlan] { + def sameOutput(output1: Seq[Attribute], output2: Seq[Attribute]): Boolean = +output1.size == output2.size && + output1.zip(output2).forall(pair => pair._1.semanticEquals(pair._2)) + def apply(plan: LogicalPlan): LogicalPlan = plan transform { // Prunes the unused columns from project list of Project/Aggregate/Window/Expand case p @ Project(_, p2: Project) if (p2.outputSet -- p.references).nonEmpty => @@ -378,7 +382,7 @@ object ColumnPruning extends Rule[LogicalPlan] { case p @ Project(_, l: LeafNode) => p // Eliminate no-op Projects -case p @ Project(projectList, child) if child.output == p.output => child +case p @ Project(projectList, child) if sameOutput(child.output, p.output) => child // for all other logical plans that inherits the output from it's children case p @ Project(_, child) => http://git-wip-us.apache.org/repos/asf/spark/blob/7b25dc7b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala index 5cab1fc..d09601e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.{Ascending, Explode, Literal, SortOrder} -import org.apache.spark.sql.catalyst.plans.PlanTest