spark git commit: [MINOR][CORE][TEST] Remove unnecessary sort in UnsafeInMemorySorterSuite
Repository: spark Updated Branches: refs/heads/branch-2.3 470cacd49 -> a2f65eb79 [MINOR][CORE][TEST] Remove unnecessary sort in UnsafeInMemorySorterSuite ## What changes were proposed in this pull request? We don't require specific ordering of the input data, the sort action is not necessary and misleading. ## How was this patch tested? Existing test suite. Author: Xingbo Jiang Closes #21536 from jiangxb1987/sorterSuite. (cherry picked from commit 534065efeb51ff0d308fa6cc9dea0715f8ce25ad) Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a2f65eb7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a2f65eb7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a2f65eb7 Branch: refs/heads/branch-2.3 Commit: a2f65eb794c123e967e45f00f8b74dff24d2a28c Parents: 470cacd Author: Xingbo Jiang Authored: Thu Jun 14 14:20:48 2018 +0800 Committer: hyukjinkwon Committed: Thu Jun 14 14:22:11 2018 +0800 -- .../util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java | 1 - 1 file changed, 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a2f65eb7/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java -- diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java index c145532..85ffdca 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java @@ -129,7 +129,6 @@ public class UnsafeInMemorySorterSuite { final UnsafeSorterIterator iter = sorter.getSortedIterator(); int iterLength = 0; long prevPrefix = -1; -Arrays.sort(dataToSort); while (iter.hasNext()) { iter.loadNext(); final String str = - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [MINOR][CORE][TEST] Remove unnecessary sort in UnsafeInMemorySorterSuite
Repository: spark Updated Branches: refs/heads/master 3bf76918f -> 534065efe [MINOR][CORE][TEST] Remove unnecessary sort in UnsafeInMemorySorterSuite ## What changes were proposed in this pull request? We don't require specific ordering of the input data, the sort action is not necessary and misleading. ## How was this patch tested? Existing test suite. Author: Xingbo Jiang Closes #21536 from jiangxb1987/sorterSuite. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/534065ef Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/534065ef Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/534065ef Branch: refs/heads/master Commit: 534065efeb51ff0d308fa6cc9dea0715f8ce25ad Parents: 3bf7691 Author: Xingbo Jiang Authored: Thu Jun 14 14:20:48 2018 +0800 Committer: hyukjinkwon Committed: Thu Jun 14 14:20:48 2018 +0800 -- .../util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java | 1 - 1 file changed, 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/534065ef/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java -- diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java index c145532..85ffdca 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java @@ -129,7 +129,6 @@ public class UnsafeInMemorySorterSuite { final UnsafeSorterIterator iter = sorter.getSortedIterator(); int iterLength = 0; long prevPrefix = -1; -Arrays.sort(dataToSort); while (iter.hasNext()) { iter.loadNext(); final String str = - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r27442 - in /dev/spark/2.4.0-SNAPSHOT-2018_06_13_16_01-3bf7691-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Jun 13 23:15:41 2018 New Revision: 27442 Log: Apache Spark 2.4.0-SNAPSHOT-2018_06_13_16_01-3bf7691 docs [This commit notification would consist of 1467 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23732][DOCS] Fix source links in generated scaladoc.
Repository: spark Updated Branches: refs/heads/branch-2.2 1f81ade0c -> 090b883fa [SPARK-23732][DOCS] Fix source links in generated scaladoc. Apply the suggestion on the bug to fix source links. Tested with the 2.3.1 release docs. Author: Marcelo Vanzin Closes #21521 from vanzin/SPARK-23732. (cherry picked from commit dc22465f3e1ef5ad59306b1f591d6fd16d674eb7) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/090b883f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/090b883f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/090b883f Branch: refs/heads/branch-2.2 Commit: 090b883fa7e0c4bf7f6abb3a5e8f82161fe3fb50 Parents: 1f81ade Author: Marcelo Vanzin Authored: Tue Jun 12 09:32:14 2018 +0800 Committer: Marcelo Vanzin Committed: Wed Jun 13 16:12:20 2018 -0700 -- project/SparkBuild.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/090b883f/project/SparkBuild.scala -- diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index aa46995..9bc9d2a 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -713,7 +713,8 @@ object Unidoc { scalacOptions in (ScalaUnidoc, unidoc) ++= Seq( "-groups", // Group similar methods together based on the @group annotation. - "-skip-packages", "org.apache.hadoop" + "-skip-packages", "org.apache.hadoop", + "-sourcepath", (baseDirectory in ThisBuild).value.getAbsolutePath ) ++ ( // Add links to sources when generating Scaladoc for a non-snapshot release if (!isSnapshot.value) { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23732][DOCS] Fix source links in generated scaladoc.
Repository: spark Updated Branches: refs/heads/branch-2.1 d405e6657 -> 858e89b43 [SPARK-23732][DOCS] Fix source links in generated scaladoc. Apply the suggestion on the bug to fix source links. Tested with the 2.3.1 release docs. Author: Marcelo Vanzin Closes #21521 from vanzin/SPARK-23732. (cherry picked from commit dc22465f3e1ef5ad59306b1f591d6fd16d674eb7) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/858e89b4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/858e89b4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/858e89b4 Branch: refs/heads/branch-2.1 Commit: 858e89b43db6af72d584213bb7c11ac6687666f6 Parents: d405e66 Author: Marcelo Vanzin Authored: Tue Jun 12 09:32:14 2018 +0800 Committer: Marcelo Vanzin Committed: Wed Jun 13 16:12:50 2018 -0700 -- project/SparkBuild.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/858e89b4/project/SparkBuild.scala -- diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index e772fa0..6afc5f0 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -757,7 +757,8 @@ object Unidoc { scalacOptions in (ScalaUnidoc, unidoc) ++= Seq( "-groups", // Group similar methods together based on the @group annotation. - "-skip-packages", "org.apache.hadoop" + "-skip-packages", "org.apache.hadoop", + "-sourcepath", (baseDirectory in ThisBuild).value.getAbsolutePath ) ++ ( // Add links to sources when generating Scaladoc for a non-snapshot release if (!isSnapshot.value) { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24531][TESTS] Replace 2.3.0 version with 2.3.1
Repository: spark Updated Branches: refs/heads/master 1b46f41c5 -> 3bf76918f [SPARK-24531][TESTS] Replace 2.3.0 version with 2.3.1 ## What changes were proposed in this pull request? The PR updates the 2.3 version tested to the new release 2.3.1. ## How was this patch tested? existing UTs Author: Marco Gaido Closes #21543 from mgaido91/patch-1. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3bf76918 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3bf76918 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3bf76918 Branch: refs/heads/master Commit: 3bf76918fb67fb3ee9aed254d4fb3b87a7e66117 Parents: 1b46f41 Author: Marco Gaido Authored: Wed Jun 13 15:18:19 2018 -0700 Committer: Xiao Li Committed: Wed Jun 13 15:18:19 2018 -0700 -- .../apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3bf76918/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala index 6f904c9..5149218 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala @@ -195,7 +195,7 @@ class HiveExternalCatalogVersionsSuite extends SparkSubmitTestUtils { object PROCESS_TABLES extends QueryTest with SQLTestUtils { // Tests the latest version of every release line. - val testingVersions = Seq("2.0.2", "2.1.2", "2.2.1", "2.3.0") + val testingVersions = Seq("2.0.2", "2.1.2", "2.2.1", "2.3.1") protected var spark: SparkSession = _ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24235][SS] Implement continuous shuffle writer for single reader partition.
Repository: spark Updated Branches: refs/heads/master 299d297e2 -> 1b46f41c5 [SPARK-24235][SS] Implement continuous shuffle writer for single reader partition. ## What changes were proposed in this pull request? https://docs.google.com/document/d/1IL4kJoKrZWeyIhklKUJqsW-yEN7V7aL05MmM65AYOfE/edit Implement continuous shuffle write RDD for a single reader partition. (I don't believe any implementation changes are actually required for multiple reader partitions, but this PR is already very large, so I want to exclude those for now to keep the size down.) ## How was this patch tested? new unit tests Author: Jose Torres Closes #21428 from jose-torres/writerTask. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1b46f41c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1b46f41c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1b46f41c Branch: refs/heads/master Commit: 1b46f41c55f5cd29956e17d7da95a95580cf273f Parents: 299d297 Author: Jose Torres Authored: Wed Jun 13 13:13:01 2018 -0700 Committer: Shixiong Zhu Committed: Wed Jun 13 13:13:01 2018 -0700 -- .../shuffle/ContinuousShuffleReadRDD.scala | 6 +- .../shuffle/ContinuousShuffleWriter.scala | 27 ++ .../shuffle/RPCContinuousShuffleReader.scala| 138 ++ .../shuffle/RPCContinuousShuffleWriter.scala| 60 +++ .../continuous/shuffle/UnsafeRowReceiver.scala | 138 -- .../shuffle/ContinuousShuffleReadSuite.scala| 291 - .../shuffle/ContinuousShuffleSuite.scala| 416 +++ 7 files changed, 645 insertions(+), 431 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1b46f41c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala index 801b28b..cf6572d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala @@ -34,8 +34,10 @@ case class ContinuousShuffleReadPartition( // Initialized only on the executor, and only once even as we call compute() multiple times. lazy val (reader: ContinuousShuffleReader, endpoint) = { val env = SparkEnv.get.rpcEnv -val receiver = new UnsafeRowReceiver(queueSize, numShuffleWriters, epochIntervalMs, env) -val endpoint = env.setupEndpoint(s"UnsafeRowReceiver-${UUID.randomUUID()}", receiver) +val receiver = new RPCContinuousShuffleReader( + queueSize, numShuffleWriters, epochIntervalMs, env) +val endpoint = env.setupEndpoint(s"RPCContinuousShuffleReader-${UUID.randomUUID()}", receiver) + TaskContext.get().addTaskCompletionListener { ctx => env.stop(endpoint) } http://git-wip-us.apache.org/repos/asf/spark/blob/1b46f41c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleWriter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleWriter.scala new file mode 100644 index 000..47b1f78 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleWriter.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import org.apache.spark.sql.catalyst.expressions.UnsafeRow + +/** + * Trait for writing to a continuo
svn commit: r27423 - in /dev/spark/2.4.0-SNAPSHOT-2018_06_13_08_01-299d297-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Jun 13 15:16:31 2018 New Revision: 27423 Log: Apache Spark 2.4.0-SNAPSHOT-2018_06_13_08_01-299d297 docs [This commit notification would consist of 1467 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24500][SQL] Make sure streams are materialized during Tree transforms.
Repository: spark Updated Branches: refs/heads/master 7703b46d2 -> 299d297e2 [SPARK-24500][SQL] Make sure streams are materialized during Tree transforms. ## What changes were proposed in this pull request? If you construct catalyst trees using `scala.collection.immutable.Stream` you can run into situations where valid transformations do not seem to have any effect. There are two causes for this behavior: - `Stream` is evaluated lazily. Note that default implementation will generally only evaluate a function for the first element (this makes testing a bit tricky). - `TreeNode` and `QueryPlan` use side effects to detect if a tree has changed. Mapping over a stream is lazy and does not need to trigger this side effect. If this happens the node will invalidly assume that it did not change and return itself instead if the newly created node (this is for GC reasons). This PR fixes this issue by forcing materialization on streams in `TreeNode` and `QueryPlan`. ## How was this patch tested? Unit tests were added to `TreeNodeSuite` and `LogicalPlanSuite`. An integration test was added to the `PlannerSuite` Author: Herman van Hovell Closes #21539 from hvanhovell/SPARK-24500. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/299d297e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/299d297e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/299d297e Branch: refs/heads/master Commit: 299d297e250ca3d46616a97e4256aa9ad6a135e5 Parents: 7703b46 Author: Herman van Hovell Authored: Wed Jun 13 07:09:48 2018 -0700 Committer: Wenchen Fan Committed: Wed Jun 13 07:09:48 2018 -0700 -- .../spark/sql/catalyst/plans/QueryPlan.scala| 1 + .../spark/sql/catalyst/trees/TreeNode.scala | 122 +-- .../sql/catalyst/plans/LogicalPlanSuite.scala | 20 ++- .../sql/catalyst/trees/TreeNodeSuite.scala | 25 +++- .../spark/sql/execution/PlannerSuite.scala | 11 +- 5 files changed, 109 insertions(+), 70 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/299d297e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 64cb8c7..e431c95 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -119,6 +119,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT case Some(value) => Some(recursiveTransform(value)) case m: Map[_, _] => m case d: DataType => d // Avoid unpacking Structs + case stream: Stream[_] => stream.map(recursiveTransform).force case seq: Traversable[_] => seq.map(recursiveTransform) case other: AnyRef => other case null => null http://git-wip-us.apache.org/repos/asf/spark/blob/299d297e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 9c7d47f..becfa8d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -199,44 +199,33 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { var changed = false val remainingNewChildren = newChildren.toBuffer val remainingOldChildren = children.toBuffer +def mapTreeNode(node: TreeNode[_]): TreeNode[_] = { + val newChild = remainingNewChildren.remove(0) + val oldChild = remainingOldChildren.remove(0) + if (newChild fastEquals oldChild) { +oldChild + } else { +changed = true +newChild + } +} +def mapChild(child: Any): Any = child match { + case arg: TreeNode[_] if containsChild(arg) => mapTreeNode(arg) + case nonChild: AnyRef => nonChild + case null => null +} val newArgs = mapProductIterator { case s: StructType => s // Don't convert struct types to some other type of Seq[StructField] // Handle Seq[TreeNode] in TreeNode parameters. - case s: Seq[_] => s.map { -case arg: TreeNode[_] if containsChild(arg) => - val newChild = remainingNewChildren.remove(0) - val oldChild = remainingOldChildren.remove(0) - if (newChild fastEquals oldChild) { -oldC
spark git commit: [SPARK-24479][SS] Added config for registering streamingQueryListeners
Repository: spark Updated Branches: refs/heads/master 4c388bccf -> 7703b46d2 [SPARK-24479][SS] Added config for registering streamingQueryListeners ## What changes were proposed in this pull request? Currently a "StreamingQueryListener" can only be registered programatically. We could have a new config "spark.sql.streamingQueryListeners" similar to "spark.sql.queryExecutionListeners" and "spark.extraListeners" for users to register custom streaming listeners. ## How was this patch tested? New unit test and running example programs. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Arun Mahadevan Closes #21504 from arunmahadevan/SPARK-24480. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7703b46d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7703b46d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7703b46d Branch: refs/heads/master Commit: 7703b46d2843db99e28110c4c7ccf60934412504 Parents: 4c388bc Author: Arun Mahadevan Authored: Wed Jun 13 20:43:16 2018 +0800 Committer: hyukjinkwon Committed: Wed Jun 13 20:43:16 2018 +0800 -- .../spark/sql/internal/StaticSQLConf.scala | 8 +++ .../sql/streaming/StreamingQueryManager.scala | 15 + .../StreamingQueryListenersConfSuite.scala | 66 3 files changed, 89 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7703b46d/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala index fe0ad39..382ef28 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala @@ -96,6 +96,14 @@ object StaticSQLConf { .toSequence .createOptional + val STREAMING_QUERY_LISTENERS = buildStaticConf("spark.sql.streaming.streamingQueryListeners") +.doc("List of class names implementing StreamingQueryListener that will be automatically " + + "added to newly created sessions. The classes should have either a no-arg constructor, " + + "or a constructor that expects a SparkConf argument.") +.stringConf +.toSequence +.createOptional + val UI_RETAINED_EXECUTIONS = buildStaticConf("spark.sql.ui.retainedExecutions") .doc("Number of executions to retain in the Spark UI.") http://git-wip-us.apache.org/repos/asf/spark/blob/7703b46d/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index 97da2b1..25bb052 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -24,6 +24,7 @@ import scala.collection.mutable import org.apache.hadoop.fs.Path +import org.apache.spark.SparkException import org.apache.spark.annotation.InterfaceStability import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession} @@ -32,6 +33,7 @@ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.continuous.{ContinuousExecution, ContinuousTrigger} import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.StaticSQLConf.STREAMING_QUERY_LISTENERS import org.apache.spark.sql.sources.v2.StreamWriteSupport import org.apache.spark.util.{Clock, SystemClock, Utils} @@ -55,6 +57,19 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo @GuardedBy("awaitTerminationLock") private var lastTerminatedQuery: StreamingQuery = null + try { +sparkSession.sparkContext.conf.get(STREAMING_QUERY_LISTENERS).foreach { classNames => + Utils.loadExtensions(classOf[StreamingQueryListener], classNames, +sparkSession.sparkContext.conf).foreach(listener => { +addListener(listener) +logInfo(s"Registered listener ${listener.getClass.getName}") + }) +} + } catch { +case e: Exception => + throw new SparkException("Exception when registering StreamingQueryListener", e) + } + /** * Returns a list of active queries associated with this SQLContext * http://git-wip-us.apache.o
svn commit: r27416 - in /dev/spark/2.4.0-SNAPSHOT-2018_06_13_00_01-4c388bc-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Jun 13 07:17:21 2018 New Revision: 27416 Log: Apache Spark 2.4.0-SNAPSHOT-2018_06_13_00_01-4c388bc docs [This commit notification would consist of 1467 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org